1/* 2NNCP -- Node to Node copy, utilities for store-and-forward data exchange 3Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org> 4 5This program is free software: you can redistribute it and/or modify 6it under the terms of the GNU General Public License as published by 7the Free Software Foundation, version 3 of the License. 8 9This program is distributed in the hope that it will be useful, 10but WITHOUT ANY WARRANTY; without even the implied warranty of 11MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12GNU General Public License for more details. 13 14You should have received a copy of the GNU General Public License 15along with this program. If not, see <http://www.gnu.org/licenses/>. 16*/ 17 18package nncp 19 20import ( 21 "archive/tar" 22 "bufio" 23 "bytes" 24 "errors" 25 "fmt" 26 "io" 27 "os" 28 "path/filepath" 29 "strconv" 30 "strings" 31 "time" 32 33 xdr "github.com/davecgh/go-xdr/xdr2" 34 "github.com/dustin/go-humanize" 35 "github.com/klauspost/compress/zstd" 36 "golang.org/x/crypto/blake2b" 37) 38 39const ( 40 MaxFileSize = 1 << 62 41 42 TarBlockSize = 512 43 TarExt = ".tar" 44) 45 46type PktEncWriteResult struct { 47 pktEncRaw []byte 48 size int64 49 err error 50} 51 52func (ctx *Ctx) Tx( 53 node *Node, 54 pkt *Pkt, 55 nice uint8, 56 srcSize, minSize, maxSize int64, 57 src io.Reader, 58 pktName string, 59 areaId *AreaId, 60) (*Node, int64, error) { 61 var area *Area 62 if areaId != nil { 63 area = ctx.AreaId2Area[*areaId] 64 if area.Prv == nil { 65 return nil, 0, errors.New("area has no encryption keys") 66 } 67 } 68 hops := make([]*Node, 0, 1+len(node.Via)) 69 hops = append(hops, node) 70 lastNode := node 71 for i := len(node.Via); i > 0; i-- { 72 lastNode = ctx.Neigh[*node.Via[i-1]] 73 hops = append(hops, lastNode) 74 } 75 wrappers := len(hops) 76 if area != nil { 77 wrappers++ 78 } 79 var expectedSize int64 80 if srcSize > 0 { 81 expectedSize = srcSize + PktOverhead 82 expectedSize += sizePadCalc(expectedSize, minSize, wrappers) 83 expectedSize = PktEncOverhead + sizeWithTags(expectedSize) 84 if maxSize != 0 && expectedSize > maxSize { 85 return nil, 0, TooBig 86 } 87 if !ctx.IsEnoughSpace(expectedSize) { 88 return nil, 0, errors.New("is not enough space") 89 } 90 } 91 tmp, err := ctx.NewTmpFileWHash() 92 if err != nil { 93 return nil, 0, err 94 } 95 96 results := make(chan PktEncWriteResult) 97 pipeR, pipeW := io.Pipe() 98 var pipeRPrev io.Reader 99 if area == nil { 100 go func(src io.Reader, dst io.WriteCloser) { 101 ctx.LogD("tx", LEs{ 102 {"Node", hops[0].Id}, 103 {"Nice", int(nice)}, 104 {"Size", expectedSize}, 105 }, func(les LEs) string { 106 return fmt.Sprintf( 107 "Tx packet to %s (source %s) nice: %s", 108 ctx.NodeName(hops[0].Id), 109 humanize.IBytes(uint64(expectedSize)), 110 NicenessFmt(nice), 111 ) 112 }) 113 pktEncRaw, size, err := PktEncWrite( 114 ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst, 115 ) 116 results <- PktEncWriteResult{pktEncRaw, size, err} 117 dst.Close() 118 }(src, pipeW) 119 } else { 120 go func(src io.Reader, dst io.WriteCloser) { 121 ctx.LogD("tx", LEs{ 122 {"Area", area.Id}, 123 {"Nice", int(nice)}, 124 {"Size", expectedSize}, 125 }, func(les LEs) string { 126 return fmt.Sprintf( 127 "Tx area packet to %s (source %s) nice: %s", 128 ctx.AreaName(areaId), 129 humanize.IBytes(uint64(expectedSize)), 130 NicenessFmt(nice), 131 ) 132 }) 133 areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)} 134 copy(areaNode.Id[:], area.Id[:]) 135 copy(areaNode.ExchPub[:], area.Pub[:]) 136 pktEncRaw, size, err := PktEncWrite( 137 ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst, 138 ) 139 results <- PktEncWriteResult{pktEncRaw, size, err} 140 dst.Close() 141 }(src, pipeW) 142 pipeRPrev = pipeR 143 pipeR, pipeW = io.Pipe() 144 go func(src io.Reader, dst io.WriteCloser) { 145 pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:]) 146 if err != nil { 147 panic(err) 148 } 149 ctx.LogD("tx", LEs{ 150 {"Node", hops[0].Id}, 151 {"Nice", int(nice)}, 152 {"Size", expectedSize}, 153 }, func(les LEs) string { 154 return fmt.Sprintf( 155 "Tx packet to %s (source %s) nice: %s", 156 ctx.NodeName(hops[0].Id), 157 humanize.IBytes(uint64(expectedSize)), 158 NicenessFmt(nice), 159 ) 160 }) 161 pktEncRaw, size, err := PktEncWrite( 162 ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst, 163 ) 164 results <- PktEncWriteResult{pktEncRaw, size, err} 165 dst.Close() 166 }(pipeRPrev, pipeW) 167 } 168 for i := 1; i < len(hops); i++ { 169 pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) 170 if err != nil { 171 panic(err) 172 } 173 pipeRPrev = pipeR 174 pipeR, pipeW = io.Pipe() 175 go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) { 176 ctx.LogD("tx", LEs{ 177 {"Node", node.Id}, 178 {"Nice", int(nice)}, 179 }, func(les LEs) string { 180 return fmt.Sprintf( 181 "Tx trns packet to %s nice: %s", 182 ctx.NodeName(node.Id), 183 NicenessFmt(nice), 184 ) 185 }) 186 pktEncRaw, size, err := PktEncWrite( 187 ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst, 188 ) 189 results <- PktEncWriteResult{pktEncRaw, size, err} 190 dst.Close() 191 }(hops[i], pktTrns, pipeRPrev, pipeW) 192 } 193 go func() { 194 _, err := CopyProgressed( 195 tmp.W, pipeR, "Tx", 196 LEs{{"Pkt", pktName}, {"FullSize", expectedSize}}, 197 ctx.ShowPrgrs, 198 ) 199 results <- PktEncWriteResult{err: err} 200 }() 201 var pktEncRaw []byte 202 var pktEncMsg []byte 203 var payloadSize int64 204 if area != nil { 205 r := <-results 206 payloadSize = r.size 207 pktEncMsg = r.pktEncRaw 208 wrappers-- 209 } 210 for i := 0; i <= wrappers; i++ { 211 r := <-results 212 if r.err != nil { 213 tmp.Fd.Close() 214 return nil, 0, err 215 } 216 if r.pktEncRaw != nil { 217 pktEncRaw = r.pktEncRaw 218 if payloadSize == 0 { 219 payloadSize = r.size 220 } 221 } 222 } 223 nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) 224 err = tmp.Commit(filepath.Join(nodePath, string(TTx))) 225 os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) 226 if err != nil { 227 return lastNode, 0, err 228 } 229 if ctx.HdrUsage { 230 ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) 231 } 232 if area != nil { 233 msgHashRaw := blake2b.Sum256(pktEncMsg) 234 msgHash := Base32Codec.EncodeToString(msgHashRaw[:]) 235 seenDir := filepath.Join( 236 ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(), 237 ) 238 seenPath := filepath.Join(seenDir, msgHash) 239 les := LEs{ 240 {"Node", node.Id}, 241 {"Nice", int(nice)}, 242 {"Size", expectedSize}, 243 {"Area", areaId}, 244 {"AreaMsg", msgHash}, 245 } 246 logMsg := func(les LEs) string { 247 return fmt.Sprintf( 248 "Tx area packet to %s (source %s) nice: %s, area %s: %s", 249 ctx.NodeName(node.Id), 250 humanize.IBytes(uint64(expectedSize)), 251 NicenessFmt(nice), 252 area.Name, 253 msgHash, 254 ) 255 } 256 if err = ensureDir(seenDir); err != nil { 257 ctx.LogE("tx-mkdir", les, err, logMsg) 258 return lastNode, 0, err 259 } 260 if fd, err := os.Create(seenPath); err == nil { 261 fd.Close() 262 if err = DirSync(seenDir); err != nil { 263 ctx.LogE("tx-dirsync", les, err, logMsg) 264 return lastNode, 0, err 265 } 266 } 267 ctx.LogI("tx-area", les, logMsg) 268 } 269 return lastNode, payloadSize, err 270} 271 272type DummyCloser struct{} 273 274func (dc DummyCloser) Close() error { return nil } 275 276func prepareTxFile(srcPath string) ( 277 reader io.Reader, 278 closer io.Closer, 279 srcSize int64, 280 archived bool, 281 rerr error, 282) { 283 if srcPath == "-" { 284 reader = os.Stdin 285 closer = os.Stdin 286 return 287 } 288 289 srcStat, err := os.Stat(srcPath) 290 if err != nil { 291 rerr = err 292 return 293 } 294 mode := srcStat.Mode() 295 296 if mode.IsRegular() { 297 // It is regular file, just send it 298 src, err := os.Open(srcPath) 299 if err != nil { 300 rerr = err 301 return 302 } 303 reader = src 304 closer = src 305 srcSize = srcStat.Size() 306 return 307 } 308 309 if !mode.IsDir() { 310 rerr = errors.New("unsupported file type") 311 return 312 } 313 314 // It is directory, create PAX archive with its contents 315 archived = true 316 basePath := filepath.Base(srcPath) 317 rootPath, err := filepath.Abs(srcPath) 318 if err != nil { 319 rerr = err 320 return 321 } 322 type einfo struct { 323 path string 324 modTime time.Time 325 size int64 326 } 327 dirs := make([]einfo, 0, 1<<10) 328 files := make([]einfo, 0, 1<<10) 329 rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { 330 if err != nil { 331 return err 332 } 333 if info.IsDir() { 334 // directory header, PAX record header+contents 335 srcSize += TarBlockSize + 2*TarBlockSize 336 dirs = append(dirs, einfo{path: path, modTime: info.ModTime()}) 337 } else { 338 // file header, PAX record header+contents, file content 339 srcSize += TarBlockSize + 2*TarBlockSize + info.Size() 340 if n := info.Size() % TarBlockSize; n != 0 { 341 srcSize += TarBlockSize - n // padding 342 } 343 files = append(files, einfo{ 344 path: path, 345 modTime: info.ModTime(), 346 size: info.Size(), 347 }) 348 } 349 return nil 350 }) 351 if rerr != nil { 352 return 353 } 354 355 r, w := io.Pipe() 356 reader = r 357 closer = DummyCloser{} 358 srcSize += 2 * TarBlockSize // termination block 359 360 go func() error { 361 tarWr := tar.NewWriter(w) 362 hdr := tar.Header{ 363 Typeflag: tar.TypeDir, 364 Mode: 0777, 365 PAXRecords: map[string]string{ 366 "comment": "Autogenerated by " + VersionGet(), 367 }, 368 Format: tar.FormatPAX, 369 } 370 for _, e := range dirs { 371 hdr.Name = basePath + e.path[len(rootPath):] 372 hdr.ModTime = e.modTime 373 if err = tarWr.WriteHeader(&hdr); err != nil { 374 return w.CloseWithError(err) 375 } 376 } 377 hdr.Typeflag = tar.TypeReg 378 hdr.Mode = 0666 379 for _, e := range files { 380 hdr.Name = basePath + e.path[len(rootPath):] 381 hdr.ModTime = e.modTime 382 hdr.Size = e.size 383 if err = tarWr.WriteHeader(&hdr); err != nil { 384 return w.CloseWithError(err) 385 } 386 fd, err := os.Open(e.path) 387 if err != nil { 388 fd.Close() 389 return w.CloseWithError(err) 390 } 391 if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil { 392 fd.Close() 393 return w.CloseWithError(err) 394 } 395 fd.Close() 396 } 397 if err = tarWr.Close(); err != nil { 398 return w.CloseWithError(err) 399 } 400 return w.Close() 401 }() 402 return 403} 404 405func (ctx *Ctx) TxFile( 406 node *Node, 407 nice uint8, 408 srcPath, dstPath string, 409 chunkSize, minSize, maxSize int64, 410 areaId *AreaId, 411) error { 412 dstPathSpecified := false 413 if dstPath == "" { 414 if srcPath == "-" { 415 return errors.New("Must provide destination filename") 416 } 417 dstPath = filepath.Base(srcPath) 418 } else { 419 dstPathSpecified = true 420 } 421 dstPath = filepath.Clean(dstPath) 422 if filepath.IsAbs(dstPath) { 423 return errors.New("Relative destination path required") 424 } 425 reader, closer, srcSize, archived, err := prepareTxFile(srcPath) 426 if closer != nil { 427 defer closer.Close() 428 } 429 if err != nil { 430 return err 431 } 432 if archived && !dstPathSpecified { 433 dstPath += TarExt 434 } 435 436 if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) { 437 pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) 438 if err != nil { 439 return err 440 } 441 _, finalSize, err := ctx.Tx( 442 node, pkt, nice, 443 srcSize, minSize, maxSize, 444 bufio.NewReader(reader), dstPath, areaId, 445 ) 446 les := LEs{ 447 {"Type", "file"}, 448 {"Node", node.Id}, 449 {"Nice", int(nice)}, 450 {"Src", srcPath}, 451 {"Dst", dstPath}, 452 {"Size", finalSize}, 453 } 454 logMsg := func(les LEs) string { 455 return fmt.Sprintf( 456 "File %s (%s) sent to %s:%s", 457 srcPath, 458 humanize.IBytes(uint64(finalSize)), 459 ctx.NodeName(node.Id), 460 dstPath, 461 ) 462 } 463 if err == nil { 464 ctx.LogI("tx", les, logMsg) 465 } else { 466 ctx.LogE("tx", les, err, logMsg) 467 } 468 return err 469 } 470 471 br := bufio.NewReader(reader) 472 var sizeFull int64 473 var chunkNum int 474 checksums := [][MTHSize]byte{} 475 for { 476 lr := io.LimitReader(br, chunkSize) 477 path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) 478 pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) 479 if err != nil { 480 return err 481 } 482 hsh := MTHNew(0, 0) 483 _, size, err := ctx.Tx( 484 node, pkt, nice, 485 0, minSize, maxSize, 486 io.TeeReader(lr, hsh), 487 path, areaId, 488 ) 489 490 les := LEs{ 491 {"Type", "file"}, 492 {"Node", node.Id}, 493 {"Nice", int(nice)}, 494 {"Src", srcPath}, 495 {"Dst", path}, 496 {"Size", size}, 497 } 498 logMsg := func(les LEs) string { 499 return fmt.Sprintf( 500 "File %s (%s) sent to %s:%s", 501 srcPath, 502 humanize.IBytes(uint64(size)), 503 ctx.NodeName(node.Id), 504 path, 505 ) 506 } 507 if err == nil { 508 ctx.LogI("tx", les, logMsg) 509 } else { 510 ctx.LogE("tx", les, err, logMsg) 511 return err 512 } 513 514 sizeFull += size - PktOverhead 515 var checksum [MTHSize]byte 516 hsh.Sum(checksum[:0]) 517 checksums = append(checksums, checksum) 518 chunkNum++ 519 if size < chunkSize { 520 break 521 } 522 if _, err = br.Peek(1); err != nil { 523 break 524 } 525 } 526 527 metaPkt := ChunkedMeta{ 528 Magic: MagicNNCPMv2.B, 529 FileSize: uint64(sizeFull), 530 ChunkSize: uint64(chunkSize), 531 Checksums: checksums, 532 } 533 var buf bytes.Buffer 534 _, err = xdr.Marshal(&buf, metaPkt) 535 if err != nil { 536 return err 537 } 538 path := dstPath + ChunkedSuffixMeta 539 pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) 540 if err != nil { 541 return err 542 } 543 metaPktSize := int64(buf.Len()) 544 _, _, err = ctx.Tx( 545 node, 546 pkt, 547 nice, 548 metaPktSize, minSize, maxSize, 549 &buf, path, areaId, 550 ) 551 les := LEs{ 552 {"Type", "file"}, 553 {"Node", node.Id}, 554 {"Nice", int(nice)}, 555 {"Src", srcPath}, 556 {"Dst", path}, 557 {"Size", metaPktSize}, 558 } 559 logMsg := func(les LEs) string { 560 return fmt.Sprintf( 561 "File %s (%s) sent to %s:%s", 562 srcPath, 563 humanize.IBytes(uint64(metaPktSize)), 564 ctx.NodeName(node.Id), 565 path, 566 ) 567 } 568 if err == nil { 569 ctx.LogI("tx", les, logMsg) 570 } else { 571 ctx.LogE("tx", les, err, logMsg) 572 } 573 return err 574} 575 576func (ctx *Ctx) TxFreq( 577 node *Node, 578 nice, replyNice uint8, 579 srcPath, dstPath string, 580 minSize int64, 581) error { 582 dstPath = filepath.Clean(dstPath) 583 if filepath.IsAbs(dstPath) { 584 return errors.New("Relative destination path required") 585 } 586 srcPath = filepath.Clean(srcPath) 587 if filepath.IsAbs(srcPath) { 588 return errors.New("Relative source path required") 589 } 590 pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath)) 591 if err != nil { 592 return err 593 } 594 src := strings.NewReader(dstPath) 595 size := int64(src.Len()) 596 _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) 597 les := LEs{ 598 {"Type", "freq"}, 599 {"Node", node.Id}, 600 {"Nice", int(nice)}, 601 {"ReplyNice", int(replyNice)}, 602 {"Src", srcPath}, 603 {"Dst", dstPath}, 604 } 605 logMsg := func(les LEs) string { 606 return fmt.Sprintf( 607 "File request from %s:%s to %s sent", 608 ctx.NodeName(node.Id), srcPath, 609 dstPath, 610 ) 611 } 612 if err == nil { 613 ctx.LogI("tx", les, logMsg) 614 } else { 615 ctx.LogE("tx", les, err, logMsg) 616 } 617 return err 618} 619 620func (ctx *Ctx) TxExec( 621 node *Node, 622 nice, replyNice uint8, 623 handle string, 624 args []string, 625 in io.Reader, 626 minSize int64, maxSize int64, 627 noCompress bool, 628 areaId *AreaId, 629) error { 630 path := make([][]byte, 0, 1+len(args)) 631 path = append(path, []byte(handle)) 632 for _, arg := range args { 633 path = append(path, []byte(arg)) 634 } 635 pktType := PktTypeExec 636 if noCompress { 637 pktType = PktTypeExecFat 638 } 639 pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0})) 640 if err != nil { 641 return err 642 } 643 compressErr := make(chan error, 1) 644 if !noCompress { 645 pr, pw := io.Pipe() 646 compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault)) 647 if err != nil { 648 return err 649 } 650 go func(r io.Reader) { 651 if _, err := io.Copy(compressor, r); err != nil { 652 compressErr <- err 653 return 654 } 655 compressErr <- compressor.Close() 656 pw.Close() 657 }(in) 658 in = pr 659 } 660 _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) 661 if !noCompress { 662 e := <-compressErr 663 if err == nil { 664 err = e 665 } 666 } 667 dst := strings.Join(append([]string{handle}, args...), " ") 668 les := LEs{ 669 {"Type", "exec"}, 670 {"Node", node.Id}, 671 {"Nice", int(nice)}, 672 {"ReplyNice", int(replyNice)}, 673 {"Dst", dst}, 674 {"Size", size}, 675 } 676 logMsg := func(les LEs) string { 677 return fmt.Sprintf( 678 "Exec sent to %s@%s (%s)", 679 ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), 680 ) 681 } 682 if err == nil { 683 ctx.LogI("tx", les, logMsg) 684 } else { 685 ctx.LogE("tx", les, err, logMsg) 686 } 687 return err 688} 689 690func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error { 691 les := LEs{ 692 {"Type", "trns"}, 693 {"Node", node.Id}, 694 {"Nice", int(nice)}, 695 {"Size", size}, 696 } 697 logMsg := func(les LEs) string { 698 return fmt.Sprintf( 699 "Transitional packet to %s (%s) (nice %s)", 700 ctx.NodeName(node.Id), 701 humanize.IBytes(uint64(size)), 702 NicenessFmt(nice), 703 ) 704 } 705 ctx.LogD("tx", les, logMsg) 706 if !ctx.IsEnoughSpace(size) { 707 err := errors.New("is not enough space") 708 ctx.LogE("tx", les, err, logMsg) 709 return err 710 } 711 tmp, err := ctx.NewTmpFileWHash() 712 if err != nil { 713 return err 714 } 715 if _, err = CopyProgressed( 716 tmp.W, src, "Tx trns", 717 LEs{{"Pkt", node.Id.String()}, {"FullSize", size}}, 718 ctx.ShowPrgrs, 719 ); err != nil { 720 return err 721 } 722 nodePath := filepath.Join(ctx.Spool, node.Id.String()) 723 err = tmp.Commit(filepath.Join(nodePath, string(TTx))) 724 if err == nil { 725 ctx.LogI("tx", les, logMsg) 726 } else { 727 ctx.LogI("tx", append(les, LE{"Err", err}), logMsg) 728 } 729 os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) 730 return err 731} 732