1// SPDX-License-Identifier: ISC 2// Copyright (c) 2014-2020 Bitmark Inc. 3// Use of this source code is governed by an ISC 4// license that can be found in the LICENSE file. 5 6package peer 7 8import ( 9 "encoding/binary" 10 11 "github.com/bitmark-inc/bitmarkd/announce" 12 "github.com/bitmark-inc/bitmarkd/asset" 13 "github.com/bitmark-inc/bitmarkd/fault" 14 "github.com/bitmark-inc/bitmarkd/messagebus" 15 "github.com/bitmark-inc/bitmarkd/mode" 16 "github.com/bitmark-inc/bitmarkd/pay" 17 "github.com/bitmark-inc/bitmarkd/payment" 18 "github.com/bitmark-inc/bitmarkd/reservoir" 19 "github.com/bitmark-inc/bitmarkd/storage" 20 "github.com/bitmark-inc/bitmarkd/transactionrecord" 21 "github.com/bitmark-inc/logger" 22) 23 24// process received data 25func processSubscription(log *logger.L, command string, arguments [][]byte) { 26 27 dataLength := len(arguments) 28 switch command { 29 case "block": 30 if dataLength < 1 { 31 log.Debugf("block with too few data: %d items", dataLength) 32 return 33 } 34 log.Infof("received block: %x", arguments[0]) 35 if !mode.Is(mode.Normal) { 36 err := fault.NotAvailableDuringSynchronise 37 log.Debugf("failed assets: error: %s", err) 38 } else { 39 messagebus.Bus.Blockstore.Send("remote", arguments[0]) 40 } 41 42 case "assets": 43 if dataLength < 1 { 44 log.Debugf("assets with too few data: %d items", dataLength) 45 return 46 } 47 log.Infof("received assets: %x", arguments[0]) 48 err := processAssets(arguments[0]) 49 if nil != err { 50 log.Debugf("failed assets: error: %s", err) 51 } else { 52 messagebus.Bus.Broadcast.Send("assets", arguments[0]) 53 } 54 55 case "issues": 56 if dataLength < 1 { 57 log.Debugf("issues with too few data: %d items", dataLength) 58 return 59 } 60 log.Infof("received issues: %x", arguments[0]) 61 err := processIssues(arguments[0]) 62 if nil != err { 63 log.Debugf("failed issues: error: %s", err) 64 } else { 65 messagebus.Bus.Broadcast.Send("issues", arguments[0]) 66 } 67 68 case "transfer": 69 if dataLength < 1 { 70 log.Debugf("transfer with too few data: %d items", dataLength) 71 return 72 } 73 log.Infof("received transfer: %x", arguments[0]) 74 err := processTransfer(arguments[0]) 75 if nil != err { 76 log.Debugf("failed transfer: error: %s", err) 77 } else { 78 messagebus.Bus.Broadcast.Send("transfer", arguments[0]) 79 } 80 81 case "proof": 82 if dataLength < 1 { 83 log.Debugf("proof with too few data: %d items", dataLength) 84 return 85 } 86 log.Infof("received proof: %x", arguments[0]) 87 err := processProof(arguments[0]) 88 if nil != err { 89 log.Debugf("failed proof: error: %s", err) 90 } else { 91 messagebus.Bus.Broadcast.Send("proof", arguments[0]) 92 } 93 94 case "rpc": 95 if dataLength < 3 { 96 log.Debugf("rpc with too few data: %d items", dataLength) 97 return 98 } 99 if 8 != len(arguments[2]) { 100 log.Debug("rpc with invalid timestamp") 101 return 102 } 103 timestamp := binary.BigEndian.Uint64(arguments[2]) 104 log.Infof("received rpc: fingerprint: %x rpc: %x timestamp: %d", arguments[0], arguments[1], timestamp) 105 if announce.AddRPC(arguments[0], arguments[1], timestamp) { 106 messagebus.Bus.Broadcast.Send("rpc", arguments[0:3]...) 107 } 108 109 case "peer": 110 if dataLength < 3 { 111 log.Debugf("peer with too few data: %d items", dataLength) 112 return 113 } 114 if 8 != len(arguments[2]) { 115 log.Debug("peer with invalid timestamp") 116 return 117 } 118 timestamp := binary.BigEndian.Uint64(arguments[2]) 119 log.Infof("received peer: %x listener: %x timestamp: %d", arguments[0], arguments[1], timestamp) 120 if announce.AddPeer(arguments[0], arguments[1], timestamp) { 121 messagebus.Bus.Broadcast.Send("peer", arguments[0:3]...) 122 } 123 124 default: 125 log.Debugf("received unhandled command: %q arguments: %x", command, arguments) 126 127 } 128} 129 130// un pack each asset and cache them 131func processAssets(packed []byte) error { 132 133 if 0 == len(packed) { 134 return fault.MissingParameters 135 } 136 137 if !mode.Is(mode.Normal) { 138 return fault.NotAvailableDuringSynchronise 139 } 140 141 ok := false 142 for 0 != len(packed) { 143 transaction, n, err := transactionrecord.Packed(packed).Unpack(mode.IsTesting()) 144 if nil != err { 145 return err 146 } 147 148 switch tx := transaction.(type) { 149 case *transactionrecord.AssetData: 150 _, packedAsset, err := asset.Cache(tx, storage.Pool.Assets) 151 if nil != err { 152 return err 153 } 154 if nil != packedAsset { 155 ok = true 156 } 157 158 default: 159 return fault.TransactionIsNotAnAsset 160 } 161 packed = packed[n:] 162 } 163 164 // all items were duplicates 165 if !ok { 166 return fault.NoNewTransactions 167 } 168 return nil 169} 170 171// un pack each issue and cache them 172func processIssues(packed []byte) error { 173 174 if 0 == len(packed) { 175 return fault.MissingParameters 176 } 177 178 if !mode.Is(mode.Normal) { 179 return fault.NotAvailableDuringSynchronise 180 } 181 182 packedIssues := transactionrecord.Packed(packed) 183 issueCount := 0 // for payment difficulty 184 185 issues := make([]*transactionrecord.BitmarkIssue, 0, reservoir.MaximumIssues) 186 for 0 != len(packedIssues) { 187 transaction, n, err := packedIssues.Unpack(mode.IsTesting()) 188 if nil != err { 189 return err 190 } 191 192 switch tx := transaction.(type) { 193 case *transactionrecord.BitmarkIssue: 194 issues = append(issues, tx) 195 issueCount += 1 196 default: 197 return fault.TransactionIsNotAnIssue 198 } 199 packedIssues = packedIssues[n:] 200 } 201 if 0 == len(issues) { 202 return fault.MissingParameters 203 } 204 205 rsvr := reservoir.Get() 206 _, duplicate, err := rsvr.StoreIssues(issues) 207 if nil != err { 208 return err 209 } 210 211 if duplicate { 212 return fault.TransactionAlreadyExists 213 } 214 215 return nil 216} 217 218// unpack transfer and process it 219func processTransfer(packed []byte) error { 220 221 if 0 == len(packed) { 222 return fault.MissingParameters 223 } 224 225 if !mode.Is(mode.Normal) { 226 return fault.NotAvailableDuringSynchronise 227 } 228 229 transaction, _, err := transactionrecord.Packed(packed).Unpack(mode.IsTesting()) 230 if nil != err { 231 return err 232 } 233 234 duplicate := false 235 236 transfer, ok := transaction.(transactionrecord.BitmarkTransfer) 237 238 rsvr := reservoir.Get() 239 if ok { 240 241 _, duplicate, err = rsvr.StoreTransfer(transfer) 242 243 } else { 244 switch tx := transaction.(type) { 245 246 case *transactionrecord.ShareGrant: 247 _, duplicate, err = rsvr.StoreGrant(tx) 248 249 case *transactionrecord.ShareSwap: 250 _, duplicate, err = rsvr.StoreSwap(tx) 251 252 default: 253 return fault.TransactionIsNotATransfer 254 } 255 } 256 257 if nil != err { 258 return err 259 } 260 261 if duplicate { 262 return fault.TransactionAlreadyExists 263 } 264 265 return nil 266} 267 268// process proof block 269func processProof(packed []byte) error { 270 271 if 0 == len(packed) { 272 return fault.MissingParameters 273 } 274 275 if !mode.Is(mode.Normal) { 276 return fault.NotAvailableDuringSynchronise 277 } 278 279 var payId pay.PayId 280 nonceLength := len(packed) - len(payId) // could be negative 281 if nonceLength < payment.MinimumNonceLength || nonceLength > payment.MaximumNonceLength { 282 return fault.InvalidNonce 283 } 284 285 copy(payId[:], packed[:len(payId)]) 286 nonce := packed[len(payId):] 287 rsvr := reservoir.Get() 288 status := rsvr.TryProof(payId, nonce) 289 if reservoir.TrackingAccepted != status { 290 // pay id already processed or was invalid 291 return fault.PayIdAlreadyUsed 292 } 293 294 return nil 295} 296