1/* 2NNCP -- Node to Node copy, utilities for store-and-forward data exchange 3Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org> 4 5This program is free software: you can redistribute it and/or modify 6it under the terms of the GNU General Public License as published by 7the Free Software Foundation, version 3 of the License. 8 9This program is distributed in the hope that it will be useful, 10but WITHOUT ANY WARRANTY; without even the implied warranty of 11MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12GNU General Public License for more details. 13 14You should have received a copy of the GNU General Public License 15along with this program. If not, see <http://www.gnu.org/licenses/>. 16*/ 17 18// Create/digest stream of NNCP encrypted packets. 19package main 20 21import ( 22 "archive/tar" 23 "bufio" 24 "bytes" 25 "errors" 26 "flag" 27 "fmt" 28 "io" 29 "io/ioutil" 30 "log" 31 "os" 32 "path/filepath" 33 "strings" 34 35 xdr "github.com/davecgh/go-xdr/xdr2" 36 "github.com/dustin/go-humanize" 37 "go.cypherpunks.ru/nncp/v8" 38) 39 40const ( 41 CopyBufSize = 1 << 17 42) 43 44func usage() { 45 fmt.Fprintf(os.Stderr, nncp.UsageHeader()) 46 fmt.Fprintf(os.Stderr, "nncp-bundle -- Create/digest stream of NNCP encrypted packets\n\n") 47 fmt.Fprintf(os.Stderr, "Usage: %s [options] -tx [-delete] NODE [NODE ...] > ...\n", os.Args[0]) 48 fmt.Fprintf(os.Stderr, " %s [options] -rx -delete [-dryrun] [NODE ...] < ...\n", os.Args[0]) 49 fmt.Fprintf(os.Stderr, " %s [options] -rx [-check] [-dryrun] [NODE ...] < ...\n", os.Args[0]) 50 fmt.Fprintln(os.Stderr, "Options:") 51 flag.PrintDefaults() 52} 53 54func main() { 55 var ( 56 cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") 57 niceRaw = flag.String("nice", nncp.NicenessFmt(255), "Minimal required niceness") 58 doRx = flag.Bool("rx", false, "Receive packets") 59 doTx = flag.Bool("tx", false, "Transfer packets") 60 doDelete = flag.Bool("delete", false, "Delete transferred packets") 61 doCheck = flag.Bool("check", false, "Check integrity while receiving") 62 dryRun = flag.Bool("dryrun", false, "Do no writes") 63 spoolPath = flag.String("spool", "", "Override path to spool") 64 logPath = flag.String("log", "", "Override path to logfile") 65 quiet = flag.Bool("quiet", false, "Print only errors") 66 showPrgrs = flag.Bool("progress", false, "Force progress showing") 67 omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing") 68 debug = flag.Bool("debug", false, "Print debug messages") 69 version = flag.Bool("version", false, "Print version information") 70 warranty = flag.Bool("warranty", false, "Print warranty information") 71 ) 72 log.SetFlags(log.Lshortfile) 73 flag.Usage = usage 74 flag.Parse() 75 if *warranty { 76 fmt.Println(nncp.Warranty) 77 return 78 } 79 if *version { 80 fmt.Println(nncp.VersionGet()) 81 return 82 } 83 nice, err := nncp.NicenessParse(*niceRaw) 84 if err != nil { 85 log.Fatalln(err) 86 } 87 if *doRx && *doTx { 88 log.Fatalln("-rx and -tx can not be set simultaneously") 89 } 90 if !*doRx && !*doTx { 91 log.Fatalln("At least one of -rx and -tx must be specified") 92 } 93 94 ctx, err := nncp.CtxFromCmdline( 95 *cfgPath, 96 *spoolPath, 97 *logPath, 98 *quiet, 99 *showPrgrs, 100 *omitPrgrs, 101 *debug, 102 ) 103 if err != nil { 104 log.Fatalln("Error during initialization:", err) 105 } 106 107 nodeIds := make(map[nncp.NodeId]struct{}, flag.NArg()) 108 for i := 0; i < flag.NArg(); i++ { 109 node, err := ctx.FindNode(flag.Arg(i)) 110 if err != nil { 111 log.Fatalln("Invalid node specified:", err) 112 } 113 nodeIds[*node.Id] = struct{}{} 114 } 115 116 ctx.Umask() 117 118 if *doTx { 119 var pktName string 120 bufStdout := bufio.NewWriter(os.Stdout) 121 tarWr := tar.NewWriter(bufStdout) 122 for nodeId := range nodeIds { 123 for job := range ctx.Jobs(&nodeId, nncp.TTx) { 124 pktName = filepath.Base(job.Path) 125 les := nncp.LEs{ 126 {K: "XX", V: string(nncp.TTx)}, 127 {K: "Node", V: nodeId.String()}, 128 {K: "Pkt", V: pktName}, 129 } 130 if job.PktEnc.Nice > nice { 131 ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string { 132 return fmt.Sprintf( 133 "Bundle transfer %s/tx/%s: too nice %s", 134 ctx.NodeName(&nodeId), 135 pktName, 136 nncp.NicenessFmt(job.PktEnc.Nice), 137 ) 138 }) 139 continue 140 } 141 fd, err := os.Open(job.Path) 142 if err != nil { 143 log.Fatalln("Error during opening:", err) 144 } 145 if err = tarWr.WriteHeader(&tar.Header{ 146 Format: tar.FormatUSTAR, 147 Name: nncp.NNCPBundlePrefix, 148 Mode: 0700, 149 Typeflag: tar.TypeDir, 150 }); err != nil { 151 log.Fatalln("Error writing tar header:", err) 152 } 153 if err = tarWr.WriteHeader(&tar.Header{ 154 Format: tar.FormatPAX, 155 Name: strings.Join([]string{ 156 nncp.NNCPBundlePrefix, 157 nodeId.String(), 158 ctx.SelfId.String(), 159 pktName, 160 }, "/"), 161 Mode: 0400, 162 Size: job.Size, 163 Typeflag: tar.TypeReg, 164 }); err != nil { 165 log.Fatalln("Error writing tar header:", err) 166 } 167 if _, err = nncp.CopyProgressed( 168 tarWr, bufio.NewReader(fd), "Tx", 169 append(les, nncp.LEs{ 170 {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])}, 171 {K: "FullSize", V: job.Size}, 172 }...), 173 ctx.ShowPrgrs, 174 ); err != nil { 175 log.Fatalln("Error during copying to tar:", err) 176 } 177 if err = fd.Close(); err != nil { 178 log.Fatalln("Error during closing:", err) 179 } 180 if err = tarWr.Flush(); err != nil { 181 log.Fatalln("Error during tar flushing:", err) 182 } 183 if err = bufStdout.Flush(); err != nil { 184 log.Fatalln("Error during stdout flushing:", err) 185 } 186 if *doDelete { 187 if err = os.Remove(job.Path); err != nil { 188 log.Fatalln("Error during deletion:", err) 189 } else if ctx.HdrUsage { 190 os.Remove(nncp.JobPath2Hdr(job.Path)) 191 } 192 } 193 ctx.LogI( 194 "bundle-tx", 195 append(les, nncp.LE{K: "Size", V: job.Size}), 196 func(les nncp.LEs) string { 197 return fmt.Sprintf( 198 "Bundle transfer, sent to node %s %s (%s)", 199 ctx.NodeName(&nodeId), 200 pktName, 201 humanize.IBytes(uint64(job.Size)), 202 ) 203 }, 204 ) 205 } 206 } 207 if err = tarWr.Close(); err != nil { 208 log.Fatalln("Error during tar closing:", err) 209 } 210 } else { 211 bufStdin := bufio.NewReaderSize(os.Stdin, CopyBufSize*2) 212 pktEncBuf := make([]byte, nncp.PktEncOverhead) 213 var pktEnc *nncp.PktEnc 214 for { 215 peeked, err := bufStdin.Peek(CopyBufSize) 216 if err != nil && err != io.EOF { 217 log.Fatalln("Error during reading:", err) 218 } 219 prefixIdx := bytes.Index(peeked, []byte(nncp.NNCPBundlePrefix)) 220 if prefixIdx == -1 { 221 if err == io.EOF { 222 break 223 } 224 bufStdin.Discard(bufStdin.Buffered() - (len(nncp.NNCPBundlePrefix) - 1)) 225 continue 226 } 227 if _, err = bufStdin.Discard(prefixIdx); err != nil { 228 panic(err) 229 } 230 tarR := tar.NewReader(bufStdin) 231 entry, err := tarR.Next() 232 if err != nil { 233 if err != io.EOF { 234 ctx.LogD( 235 "bundle-rx-read-tar", 236 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, 237 func(les nncp.LEs) string { 238 return "Bundle transfer rx: reading tar" 239 }, 240 ) 241 } 242 continue 243 } 244 if entry.Typeflag != tar.TypeDir { 245 ctx.LogD( 246 "bundle-rx-read-tar", 247 nncp.LEs{ 248 {K: "XX", V: string(nncp.TRx)}, 249 {K: "Err", V: errors.New("expected NNCP/")}, 250 }, 251 func(les nncp.LEs) string { 252 return "Bundle transfer rx: reading tar" 253 }, 254 ) 255 continue 256 } 257 entry, err = tarR.Next() 258 if err != nil { 259 if err != io.EOF { 260 ctx.LogD( 261 "bundle-rx-read-tar", 262 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, 263 func(les nncp.LEs) string { 264 return "Bundle transfer rx: reading tar" 265 }, 266 ) 267 } 268 continue 269 } 270 les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}} 271 logMsg := func(les nncp.LEs) string { 272 return "Bundle transfer rx/" + entry.Name 273 } 274 if entry.Size < nncp.PktEncOverhead { 275 ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string { 276 return logMsg(les) + ": too small packet" 277 }) 278 continue 279 } 280 if !ctx.IsEnoughSpace(entry.Size) { 281 ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg) 282 continue 283 } 284 pktName := filepath.Base(entry.Name) 285 if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil { 286 ctx.LogD( 287 "bundle-rx", 288 append(les, nncp.LE{K: "Err", V: "bad packet name"}), 289 logMsg, 290 ) 291 continue 292 } 293 if _, err = io.ReadFull(tarR, pktEncBuf); err != nil { 294 ctx.LogD( 295 "bundle-rx", 296 append(les, nncp.LE{K: "Err", V: err}), 297 logMsg, 298 ) 299 continue 300 } 301 if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil { 302 ctx.LogD( 303 "bundle-rx", 304 append(les, nncp.LE{K: "Err", V: "Bad packet structure"}), 305 logMsg, 306 ) 307 continue 308 } 309 switch pktEnc.Magic { 310 case nncp.MagicNNCPEv1.B: 311 err = nncp.MagicNNCPEv1.TooOld() 312 case nncp.MagicNNCPEv2.B: 313 err = nncp.MagicNNCPEv2.TooOld() 314 case nncp.MagicNNCPEv3.B: 315 err = nncp.MagicNNCPEv3.TooOld() 316 case nncp.MagicNNCPEv4.B: 317 err = nncp.MagicNNCPEv4.TooOld() 318 case nncp.MagicNNCPEv5.B: 319 default: 320 err = errors.New("Bad packet magic number") 321 } 322 if err != nil { 323 ctx.LogD( 324 "bundle-rx", 325 append(les, nncp.LE{K: "Err", V: err.Error()}), 326 logMsg, 327 ) 328 continue 329 } 330 if pktEnc.Nice > nice { 331 ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string { 332 return logMsg(les) + ": too nice" 333 }) 334 continue 335 } 336 if *pktEnc.Sender == *ctx.SelfId && *doDelete { 337 if len(nodeIds) > 0 { 338 if _, exists := nodeIds[*pktEnc.Recipient]; !exists { 339 ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string { 340 return logMsg(les) + ": recipient is not requested" 341 }) 342 continue 343 } 344 } 345 nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:]) 346 les := nncp.LEs{ 347 {K: "XX", V: string(nncp.TTx)}, 348 {K: "Node", V: nodeId32}, 349 {K: "Pkt", V: pktName}, 350 } 351 logMsg = func(les nncp.LEs) string { 352 return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName) 353 } 354 dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName) 355 if _, err = os.Stat(dstPath); err != nil { 356 ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string { 357 return logMsg(les) + ": packet is already missing" 358 }) 359 continue 360 } 361 hsh := nncp.MTHNew(entry.Size, 0) 362 if _, err = hsh.Write(pktEncBuf); err != nil { 363 log.Fatalln("Error during writing:", err) 364 } 365 if _, err = nncp.CopyProgressed( 366 hsh, tarR, "Rx", 367 append(les, nncp.LE{K: "FullSize", V: entry.Size}), 368 ctx.ShowPrgrs, 369 ); err != nil { 370 log.Fatalln("Error during copying:", err) 371 } 372 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName { 373 ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string { 374 return logMsg(les) + ": removed" 375 }) 376 if !*dryRun { 377 os.Remove(dstPath) 378 if ctx.HdrUsage { 379 os.Remove(nncp.JobPath2Hdr(dstPath)) 380 } 381 } 382 } else { 383 ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg) 384 } 385 continue 386 } 387 if *pktEnc.Recipient != *ctx.SelfId { 388 ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string { 389 return logMsg(les) + ": unknown recipient" 390 }) 391 continue 392 } 393 if len(nodeIds) > 0 { 394 if _, exists := nodeIds[*pktEnc.Sender]; !exists { 395 ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string { 396 return logMsg(les) + ": sender is not requested" 397 }) 398 continue 399 } 400 } 401 sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:]) 402 les = nncp.LEs{ 403 {K: "XX", V: string(nncp.TRx)}, 404 {K: "Node", V: sender}, 405 {K: "Pkt", V: pktName}, 406 {K: "FullSize", V: entry.Size}, 407 } 408 logMsg = func(les nncp.LEs) string { 409 return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName) 410 } 411 dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx)) 412 dstPath := filepath.Join(dstDirPath, pktName) 413 if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) { 414 ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string { 415 return logMsg(les) + ": packet already exists" 416 }) 417 continue 418 } 419 if _, err = os.Stat(filepath.Join( 420 dstDirPath, nncp.SeenDir, pktName, 421 )); err == nil || !os.IsNotExist(err) { 422 ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string { 423 return logMsg(les) + ": packet already seen" 424 }) 425 continue 426 } 427 if *doCheck { 428 if *dryRun { 429 hsh := nncp.MTHNew(entry.Size, 0) 430 if _, err = hsh.Write(pktEncBuf); err != nil { 431 log.Fatalln("Error during writing:", err) 432 } 433 if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil { 434 log.Fatalln("Error during copying:", err) 435 } 436 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName { 437 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg) 438 continue 439 } 440 } else { 441 tmp, err := ctx.NewTmpFileWHash() 442 if err != nil { 443 log.Fatalln("Error during temporary file creation:", err) 444 } 445 if _, err = tmp.W.Write(pktEncBuf); err != nil { 446 log.Fatalln("Error during writing:", err) 447 } 448 if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil { 449 log.Fatalln("Error during copying:", err) 450 } 451 if err = tmp.W.Flush(); err != nil { 452 log.Fatalln("Error during flusing:", err) 453 } 454 if nncp.Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) == pktName { 455 if err = tmp.Commit(dstDirPath); err != nil { 456 log.Fatalln("Error during commiting:", err) 457 } 458 } else { 459 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg) 460 tmp.Cancel() 461 continue 462 } 463 } 464 } else { 465 if *dryRun { 466 if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil { 467 log.Fatalln("Error during copying:", err) 468 } 469 } else { 470 tmp, err := ctx.NewTmpFile() 471 if err != nil { 472 log.Fatalln("Error during temporary file creation:", err) 473 } 474 bufTmp := bufio.NewWriterSize(tmp, CopyBufSize) 475 if _, err = bufTmp.Write(pktEncBuf); err != nil { 476 log.Fatalln("Error during writing:", err) 477 } 478 if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil { 479 log.Fatalln("Error during copying:", err) 480 } 481 if err = bufTmp.Flush(); err != nil { 482 log.Fatalln("Error during flushing:", err) 483 } 484 if err = tmp.Sync(); err != nil { 485 log.Fatalln("Error during syncing:", err) 486 } 487 if err = tmp.Close(); err != nil { 488 log.Fatalln("Error during closing:", err) 489 } 490 if err = os.MkdirAll(dstDirPath, os.FileMode(0777)); err != nil { 491 log.Fatalln("Error during mkdir:", err) 492 } 493 if err = os.Rename(tmp.Name(), dstPath); err != nil { 494 log.Fatalln("Error during renaming:", err) 495 } 496 if err = nncp.DirSync(dstDirPath); err != nil { 497 log.Fatalln("Error during syncing:", err) 498 } 499 if ctx.HdrUsage { 500 ctx.HdrWrite(pktEncBuf, dstPath) 501 } 502 } 503 } 504 for _, le := range les { 505 if le.K == "FullSize" { 506 les = append(les, nncp.LE{K: "Size", V: le.V}) 507 break 508 } 509 } 510 ctx.LogI("bundle-rx", les, func(les nncp.LEs) string { 511 return fmt.Sprintf( 512 "Bundle transfer, received from %s %s (%s)", 513 sender, pktName, humanize.IBytes(uint64(entry.Size)), 514 ) 515 }) 516 } 517 } 518} 519