1/*
2NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5This program is free software: you can redistribute it and/or modify
6it under the terms of the GNU General Public License as published by
7the Free Software Foundation, version 3 of the License.
8
9This program is distributed in the hope that it will be useful,
10but WITHOUT ANY WARRANTY; without even the implied warranty of
11MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License
15along with this program.  If not, see <http://www.gnu.org/licenses/>.
16*/
17
18// Create/digest stream of NNCP encrypted packets.
19package main
20
21import (
22	"archive/tar"
23	"bufio"
24	"bytes"
25	"errors"
26	"flag"
27	"fmt"
28	"io"
29	"io/ioutil"
30	"log"
31	"os"
32	"path/filepath"
33	"strings"
34
35	xdr "github.com/davecgh/go-xdr/xdr2"
36	"github.com/dustin/go-humanize"
37	"go.cypherpunks.ru/nncp/v8"
38)
39
40const (
41	CopyBufSize = 1 << 17
42)
43
44func usage() {
45	fmt.Fprintf(os.Stderr, nncp.UsageHeader())
46	fmt.Fprintf(os.Stderr, "nncp-bundle -- Create/digest stream of NNCP encrypted packets\n\n")
47	fmt.Fprintf(os.Stderr, "Usage: %s [options] -tx [-delete] NODE [NODE ...] > ...\n", os.Args[0])
48	fmt.Fprintf(os.Stderr, "       %s [options] -rx -delete [-dryrun] [NODE ...] < ...\n", os.Args[0])
49	fmt.Fprintf(os.Stderr, "       %s [options] -rx [-check] [-dryrun] [NODE ...] < ...\n", os.Args[0])
50	fmt.Fprintln(os.Stderr, "Options:")
51	flag.PrintDefaults()
52}
53
54func main() {
55	var (
56		cfgPath   = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
57		niceRaw   = flag.String("nice", nncp.NicenessFmt(255), "Minimal required niceness")
58		doRx      = flag.Bool("rx", false, "Receive packets")
59		doTx      = flag.Bool("tx", false, "Transfer packets")
60		doDelete  = flag.Bool("delete", false, "Delete transferred packets")
61		doCheck   = flag.Bool("check", false, "Check integrity while receiving")
62		dryRun    = flag.Bool("dryrun", false, "Do no writes")
63		spoolPath = flag.String("spool", "", "Override path to spool")
64		logPath   = flag.String("log", "", "Override path to logfile")
65		quiet     = flag.Bool("quiet", false, "Print only errors")
66		showPrgrs = flag.Bool("progress", false, "Force progress showing")
67		omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing")
68		debug     = flag.Bool("debug", false, "Print debug messages")
69		version   = flag.Bool("version", false, "Print version information")
70		warranty  = flag.Bool("warranty", false, "Print warranty information")
71	)
72	log.SetFlags(log.Lshortfile)
73	flag.Usage = usage
74	flag.Parse()
75	if *warranty {
76		fmt.Println(nncp.Warranty)
77		return
78	}
79	if *version {
80		fmt.Println(nncp.VersionGet())
81		return
82	}
83	nice, err := nncp.NicenessParse(*niceRaw)
84	if err != nil {
85		log.Fatalln(err)
86	}
87	if *doRx && *doTx {
88		log.Fatalln("-rx and -tx can not be set simultaneously")
89	}
90	if !*doRx && !*doTx {
91		log.Fatalln("At least one of -rx and -tx must be specified")
92	}
93
94	ctx, err := nncp.CtxFromCmdline(
95		*cfgPath,
96		*spoolPath,
97		*logPath,
98		*quiet,
99		*showPrgrs,
100		*omitPrgrs,
101		*debug,
102	)
103	if err != nil {
104		log.Fatalln("Error during initialization:", err)
105	}
106
107	nodeIds := make(map[nncp.NodeId]struct{}, flag.NArg())
108	for i := 0; i < flag.NArg(); i++ {
109		node, err := ctx.FindNode(flag.Arg(i))
110		if err != nil {
111			log.Fatalln("Invalid node specified:", err)
112		}
113		nodeIds[*node.Id] = struct{}{}
114	}
115
116	ctx.Umask()
117
118	if *doTx {
119		var pktName string
120		bufStdout := bufio.NewWriter(os.Stdout)
121		tarWr := tar.NewWriter(bufStdout)
122		for nodeId := range nodeIds {
123			for job := range ctx.Jobs(&nodeId, nncp.TTx) {
124				pktName = filepath.Base(job.Path)
125				les := nncp.LEs{
126					{K: "XX", V: string(nncp.TTx)},
127					{K: "Node", V: nodeId.String()},
128					{K: "Pkt", V: pktName},
129				}
130				if job.PktEnc.Nice > nice {
131					ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string {
132						return fmt.Sprintf(
133							"Bundle transfer %s/tx/%s: too nice %s",
134							ctx.NodeName(&nodeId),
135							pktName,
136							nncp.NicenessFmt(job.PktEnc.Nice),
137						)
138					})
139					continue
140				}
141				fd, err := os.Open(job.Path)
142				if err != nil {
143					log.Fatalln("Error during opening:", err)
144				}
145				if err = tarWr.WriteHeader(&tar.Header{
146					Format:   tar.FormatUSTAR,
147					Name:     nncp.NNCPBundlePrefix,
148					Mode:     0700,
149					Typeflag: tar.TypeDir,
150				}); err != nil {
151					log.Fatalln("Error writing tar header:", err)
152				}
153				if err = tarWr.WriteHeader(&tar.Header{
154					Format: tar.FormatPAX,
155					Name: strings.Join([]string{
156						nncp.NNCPBundlePrefix,
157						nodeId.String(),
158						ctx.SelfId.String(),
159						pktName,
160					}, "/"),
161					Mode:     0400,
162					Size:     job.Size,
163					Typeflag: tar.TypeReg,
164				}); err != nil {
165					log.Fatalln("Error writing tar header:", err)
166				}
167				if _, err = nncp.CopyProgressed(
168					tarWr, bufio.NewReader(fd), "Tx",
169					append(les, nncp.LEs{
170						{K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
171						{K: "FullSize", V: job.Size},
172					}...),
173					ctx.ShowPrgrs,
174				); err != nil {
175					log.Fatalln("Error during copying to tar:", err)
176				}
177				if err = fd.Close(); err != nil {
178					log.Fatalln("Error during closing:", err)
179				}
180				if err = tarWr.Flush(); err != nil {
181					log.Fatalln("Error during tar flushing:", err)
182				}
183				if err = bufStdout.Flush(); err != nil {
184					log.Fatalln("Error during stdout flushing:", err)
185				}
186				if *doDelete {
187					if err = os.Remove(job.Path); err != nil {
188						log.Fatalln("Error during deletion:", err)
189					} else if ctx.HdrUsage {
190						os.Remove(nncp.JobPath2Hdr(job.Path))
191					}
192				}
193				ctx.LogI(
194					"bundle-tx",
195					append(les, nncp.LE{K: "Size", V: job.Size}),
196					func(les nncp.LEs) string {
197						return fmt.Sprintf(
198							"Bundle transfer, sent to node %s %s (%s)",
199							ctx.NodeName(&nodeId),
200							pktName,
201							humanize.IBytes(uint64(job.Size)),
202						)
203					},
204				)
205			}
206		}
207		if err = tarWr.Close(); err != nil {
208			log.Fatalln("Error during tar closing:", err)
209		}
210	} else {
211		bufStdin := bufio.NewReaderSize(os.Stdin, CopyBufSize*2)
212		pktEncBuf := make([]byte, nncp.PktEncOverhead)
213		var pktEnc *nncp.PktEnc
214		for {
215			peeked, err := bufStdin.Peek(CopyBufSize)
216			if err != nil && err != io.EOF {
217				log.Fatalln("Error during reading:", err)
218			}
219			prefixIdx := bytes.Index(peeked, []byte(nncp.NNCPBundlePrefix))
220			if prefixIdx == -1 {
221				if err == io.EOF {
222					break
223				}
224				bufStdin.Discard(bufStdin.Buffered() - (len(nncp.NNCPBundlePrefix) - 1))
225				continue
226			}
227			if _, err = bufStdin.Discard(prefixIdx); err != nil {
228				panic(err)
229			}
230			tarR := tar.NewReader(bufStdin)
231			entry, err := tarR.Next()
232			if err != nil {
233				if err != io.EOF {
234					ctx.LogD(
235						"bundle-rx-read-tar",
236						nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
237						func(les nncp.LEs) string {
238							return "Bundle transfer rx: reading tar"
239						},
240					)
241				}
242				continue
243			}
244			if entry.Typeflag != tar.TypeDir {
245				ctx.LogD(
246					"bundle-rx-read-tar",
247					nncp.LEs{
248						{K: "XX", V: string(nncp.TRx)},
249						{K: "Err", V: errors.New("expected NNCP/")},
250					},
251					func(les nncp.LEs) string {
252						return "Bundle transfer rx: reading tar"
253					},
254				)
255				continue
256			}
257			entry, err = tarR.Next()
258			if err != nil {
259				if err != io.EOF {
260					ctx.LogD(
261						"bundle-rx-read-tar",
262						nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
263						func(les nncp.LEs) string {
264							return "Bundle transfer rx: reading tar"
265						},
266					)
267				}
268				continue
269			}
270			les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}}
271			logMsg := func(les nncp.LEs) string {
272				return "Bundle transfer rx/" + entry.Name
273			}
274			if entry.Size < nncp.PktEncOverhead {
275				ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string {
276					return logMsg(les) + ": too small packet"
277				})
278				continue
279			}
280			if !ctx.IsEnoughSpace(entry.Size) {
281				ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg)
282				continue
283			}
284			pktName := filepath.Base(entry.Name)
285			if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil {
286				ctx.LogD(
287					"bundle-rx",
288					append(les, nncp.LE{K: "Err", V: "bad packet name"}),
289					logMsg,
290				)
291				continue
292			}
293			if _, err = io.ReadFull(tarR, pktEncBuf); err != nil {
294				ctx.LogD(
295					"bundle-rx",
296					append(les, nncp.LE{K: "Err", V: err}),
297					logMsg,
298				)
299				continue
300			}
301			if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil {
302				ctx.LogD(
303					"bundle-rx",
304					append(les, nncp.LE{K: "Err", V: "Bad packet structure"}),
305					logMsg,
306				)
307				continue
308			}
309			switch pktEnc.Magic {
310			case nncp.MagicNNCPEv1.B:
311				err = nncp.MagicNNCPEv1.TooOld()
312			case nncp.MagicNNCPEv2.B:
313				err = nncp.MagicNNCPEv2.TooOld()
314			case nncp.MagicNNCPEv3.B:
315				err = nncp.MagicNNCPEv3.TooOld()
316			case nncp.MagicNNCPEv4.B:
317				err = nncp.MagicNNCPEv4.TooOld()
318			case nncp.MagicNNCPEv5.B:
319			default:
320				err = errors.New("Bad packet magic number")
321			}
322			if err != nil {
323				ctx.LogD(
324					"bundle-rx",
325					append(les, nncp.LE{K: "Err", V: err.Error()}),
326					logMsg,
327				)
328				continue
329			}
330			if pktEnc.Nice > nice {
331				ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string {
332					return logMsg(les) + ": too nice"
333				})
334				continue
335			}
336			if *pktEnc.Sender == *ctx.SelfId && *doDelete {
337				if len(nodeIds) > 0 {
338					if _, exists := nodeIds[*pktEnc.Recipient]; !exists {
339						ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string {
340							return logMsg(les) + ": recipient is not requested"
341						})
342						continue
343					}
344				}
345				nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:])
346				les := nncp.LEs{
347					{K: "XX", V: string(nncp.TTx)},
348					{K: "Node", V: nodeId32},
349					{K: "Pkt", V: pktName},
350				}
351				logMsg = func(les nncp.LEs) string {
352					return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName)
353				}
354				dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName)
355				if _, err = os.Stat(dstPath); err != nil {
356					ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string {
357						return logMsg(les) + ": packet is already missing"
358					})
359					continue
360				}
361				hsh := nncp.MTHNew(entry.Size, 0)
362				if _, err = hsh.Write(pktEncBuf); err != nil {
363					log.Fatalln("Error during writing:", err)
364				}
365				if _, err = nncp.CopyProgressed(
366					hsh, tarR, "Rx",
367					append(les, nncp.LE{K: "FullSize", V: entry.Size}),
368					ctx.ShowPrgrs,
369				); err != nil {
370					log.Fatalln("Error during copying:", err)
371				}
372				if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
373					ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string {
374						return logMsg(les) + ": removed"
375					})
376					if !*dryRun {
377						os.Remove(dstPath)
378						if ctx.HdrUsage {
379							os.Remove(nncp.JobPath2Hdr(dstPath))
380						}
381					}
382				} else {
383					ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg)
384				}
385				continue
386			}
387			if *pktEnc.Recipient != *ctx.SelfId {
388				ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string {
389					return logMsg(les) + ": unknown recipient"
390				})
391				continue
392			}
393			if len(nodeIds) > 0 {
394				if _, exists := nodeIds[*pktEnc.Sender]; !exists {
395					ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string {
396						return logMsg(les) + ": sender is not requested"
397					})
398					continue
399				}
400			}
401			sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:])
402			les = nncp.LEs{
403				{K: "XX", V: string(nncp.TRx)},
404				{K: "Node", V: sender},
405				{K: "Pkt", V: pktName},
406				{K: "FullSize", V: entry.Size},
407			}
408			logMsg = func(les nncp.LEs) string {
409				return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName)
410			}
411			dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx))
412			dstPath := filepath.Join(dstDirPath, pktName)
413			if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) {
414				ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string {
415					return logMsg(les) + ": packet already exists"
416				})
417				continue
418			}
419			if _, err = os.Stat(filepath.Join(
420				dstDirPath, nncp.SeenDir, pktName,
421			)); err == nil || !os.IsNotExist(err) {
422				ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string {
423					return logMsg(les) + ": packet already seen"
424				})
425				continue
426			}
427			if *doCheck {
428				if *dryRun {
429					hsh := nncp.MTHNew(entry.Size, 0)
430					if _, err = hsh.Write(pktEncBuf); err != nil {
431						log.Fatalln("Error during writing:", err)
432					}
433					if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil {
434						log.Fatalln("Error during copying:", err)
435					}
436					if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName {
437						ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
438						continue
439					}
440				} else {
441					tmp, err := ctx.NewTmpFileWHash()
442					if err != nil {
443						log.Fatalln("Error during temporary file creation:", err)
444					}
445					if _, err = tmp.W.Write(pktEncBuf); err != nil {
446						log.Fatalln("Error during writing:", err)
447					}
448					if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil {
449						log.Fatalln("Error during copying:", err)
450					}
451					if err = tmp.W.Flush(); err != nil {
452						log.Fatalln("Error during flusing:", err)
453					}
454					if nncp.Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) == pktName {
455						if err = tmp.Commit(dstDirPath); err != nil {
456							log.Fatalln("Error during commiting:", err)
457						}
458					} else {
459						ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
460						tmp.Cancel()
461						continue
462					}
463				}
464			} else {
465				if *dryRun {
466					if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
467						log.Fatalln("Error during copying:", err)
468					}
469				} else {
470					tmp, err := ctx.NewTmpFile()
471					if err != nil {
472						log.Fatalln("Error during temporary file creation:", err)
473					}
474					bufTmp := bufio.NewWriterSize(tmp, CopyBufSize)
475					if _, err = bufTmp.Write(pktEncBuf); err != nil {
476						log.Fatalln("Error during writing:", err)
477					}
478					if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
479						log.Fatalln("Error during copying:", err)
480					}
481					if err = bufTmp.Flush(); err != nil {
482						log.Fatalln("Error during flushing:", err)
483					}
484					if err = tmp.Sync(); err != nil {
485						log.Fatalln("Error during syncing:", err)
486					}
487					if err = tmp.Close(); err != nil {
488						log.Fatalln("Error during closing:", err)
489					}
490					if err = os.MkdirAll(dstDirPath, os.FileMode(0777)); err != nil {
491						log.Fatalln("Error during mkdir:", err)
492					}
493					if err = os.Rename(tmp.Name(), dstPath); err != nil {
494						log.Fatalln("Error during renaming:", err)
495					}
496					if err = nncp.DirSync(dstDirPath); err != nil {
497						log.Fatalln("Error during syncing:", err)
498					}
499					if ctx.HdrUsage {
500						ctx.HdrWrite(pktEncBuf, dstPath)
501					}
502				}
503			}
504			for _, le := range les {
505				if le.K == "FullSize" {
506					les = append(les, nncp.LE{K: "Size", V: le.V})
507					break
508				}
509			}
510			ctx.LogI("bundle-rx", les, func(les nncp.LEs) string {
511				return fmt.Sprintf(
512					"Bundle transfer, received from %s %s (%s)",
513					sender, pktName, humanize.IBytes(uint64(entry.Size)),
514				)
515			})
516		}
517	}
518}
519