1/* 2Copyright 2011 The Perkeep Authors 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package client 18 19import ( 20 "bytes" 21 "context" 22 "errors" 23 "fmt" 24 "io" 25 "io/ioutil" 26 "log" 27 "mime/multipart" 28 "net/http" 29 "net/url" 30 "os" 31 "strings" 32 "sync" 33 "time" 34 35 "perkeep.org/internal/hashutil" 36 "perkeep.org/internal/httputil" 37 "perkeep.org/pkg/blob" 38 "perkeep.org/pkg/blobserver" 39 "perkeep.org/pkg/blobserver/protocol" 40 "perkeep.org/pkg/constants" 41 "perkeep.org/pkg/env" 42 "perkeep.org/pkg/schema" 43) 44 45// UploadHandle contains the parameters is a request to upload a blob. 46type UploadHandle struct { 47 // BlobRef is the required blobref of the blob to upload. 48 BlobRef blob.Ref 49 50 // Contents is the blob data. 51 Contents io.Reader 52 53 // Size optionally specifies the size of Contents. 54 // If <= 0, the Contents are slurped into memory to count the size. 55 Size uint32 56 57 // Vivify optionally instructs the server to create a 58 // permanode for this blob. If used, the blob should be a 59 // "file" schema blob. This is typically used by 60 // lesser-trusted clients (such a mobile phones) which don't 61 // have rights to do signing directly. 62 Vivify bool 63 64 // SkipStat indicates whether the stat check (checking whether 65 // the server already has the blob) will be skipped and the 66 // blob should be uploaded immediately. This is useful for 67 // small blobs that the server is unlikely to already have 68 // (e.g. new claims). 69 SkipStat bool 70} 71 72type PutResult struct { 73 BlobRef blob.Ref 74 Size uint32 75 Skipped bool // already present on blobserver 76} 77 78func (pr *PutResult) SizedBlobRef() blob.SizedRef { 79 return blob.SizedRef{pr.BlobRef, pr.Size} 80} 81 82// TODO: ditch this type and use protocol.StatResponse directly? 83// Or at least make HaveMap keyed by a blob.Ref instead of a string. 84type statResponse struct { 85 HaveMap map[string]blob.SizedRef 86 canLongPoll bool 87} 88 89type ResponseFormatError error 90 91var ( 92 multipartOnce sync.Once 93 multipartOverhead int64 94) 95 96// multipartOverhead is how many extra bytes mime/multipart's 97// Writer adds around content 98func getMultipartOverhead() int64 { 99 multipartOnce.Do(func() { 100 var b bytes.Buffer 101 w := multipart.NewWriter(&b) 102 part, _ := w.CreateFormFile("0", "0") 103 104 dummyContents := []byte("0") 105 part.Write(dummyContents) 106 107 w.Close() 108 multipartOverhead = int64(b.Len()) - 3 // remove what was added 109 }) 110 return multipartOverhead 111} 112 113func parseStatResponse(res *http.Response) (*statResponse, error) { 114 var s = &statResponse{HaveMap: make(map[string]blob.SizedRef)} 115 var pres protocol.StatResponse 116 if err := httputil.DecodeJSON(res, &pres); err != nil { 117 return nil, ResponseFormatError(err) 118 } 119 120 s.canLongPoll = pres.CanLongPoll 121 for _, statItem := range pres.Stat { 122 br := statItem.Ref 123 if !br.Valid() { 124 continue 125 } 126 s.HaveMap[br.String()] = blob.SizedRef{br, uint32(statItem.Size)} 127 } 128 return s, nil 129} 130 131// NewUploadHandleFromString returns an upload handle 132func NewUploadHandleFromString(data string) *UploadHandle { 133 bref := blob.RefFromString(data) 134 r := strings.NewReader(data) 135 return &UploadHandle{BlobRef: bref, Size: uint32(len(data)), Contents: r} 136} 137 138// TODO(bradfitz): delete most of this. use new camlistore.org/pkg/blobserver/protocol types instead 139// of a map[string]interface{}. 140func (c *Client) responseJSONMap(requestName string, resp *http.Response) (map[string]interface{}, error) { 141 if resp.StatusCode != 200 { 142 c.printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode) 143 io.Copy(os.Stderr, resp.Body) 144 return nil, fmt.Errorf("after %s request, HTTP response code is %d; no JSON to parse", requestName, resp.StatusCode) 145 } 146 jmap := make(map[string]interface{}) 147 if err := httputil.DecodeJSON(resp, &jmap); err != nil { 148 return nil, err 149 } 150 return jmap, nil 151} 152 153// statReq is a request to stat a blob. 154type statReq struct { 155 br blob.Ref 156 dest chan<- blob.SizedRef // written to on success 157 errc chan<- error // written to on both failure and success (after any dest) 158} 159 160func (c *Client) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error { 161 if c.sto != nil { 162 return c.sto.StatBlobs(ctx, blobs, fn) 163 } 164 var needStat []blob.Ref 165 for _, br := range blobs { 166 if !br.Valid() { 167 panic("invalid blob") 168 } 169 if size, ok := c.haveCache.StatBlobCache(br); ok { 170 if err := fn(blob.SizedRef{br, size}); err != nil { 171 return err 172 } 173 } else { 174 needStat = append(needStat, br) 175 } 176 } 177 if len(needStat) == 0 { 178 return nil 179 } 180 return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, c.httpGate, func(br blob.Ref) (workerSB blob.SizedRef, err error) { 181 err = c.doStat(ctx, []blob.Ref{br}, 0, false, func(sb blob.SizedRef) error { 182 workerSB = sb 183 c.haveCache.NoteBlobExists(sb.Ref, sb.Size) 184 return fn(sb) 185 }) 186 return 187 }) 188} 189 190// doStat does an HTTP request for the stat. the number of blobs is used verbatim. No extra splitting 191// or batching is done at this layer. 192// The semantics are the same as blobserver.BlobStatter. 193// gate controls whether it uses httpGate to pause on requests. 194func (c *Client) doStat(ctx context.Context, blobs []blob.Ref, wait time.Duration, gated bool, fn func(blob.SizedRef) error) error { 195 var buf bytes.Buffer 196 fmt.Fprintf(&buf, "camliversion=1") 197 if wait > 0 { 198 secs := int(wait.Seconds()) 199 if secs == 0 { 200 secs = 1 201 } 202 fmt.Fprintf(&buf, "&maxwaitsec=%d", secs) 203 } 204 for i, blob := range blobs { 205 fmt.Fprintf(&buf, "&blob%d=%s", i+1, blob) 206 } 207 208 pfx, err := c.prefix() 209 if err != nil { 210 return err 211 } 212 req := c.newRequest(ctx, "POST", fmt.Sprintf("%s/camli/stat", pfx), &buf) 213 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 214 215 var resp *http.Response 216 if gated { 217 resp, err = c.doReqGated(req) 218 } else { 219 resp, err = c.httpClient.Do(req) 220 } 221 if err != nil { 222 return fmt.Errorf("stat HTTP error: %v", err) 223 } 224 if resp.Body != nil { 225 defer resp.Body.Close() 226 } 227 228 if resp.StatusCode != 200 { 229 return fmt.Errorf("stat response had http status %d", resp.StatusCode) 230 } 231 232 stat, err := parseStatResponse(resp) 233 if err != nil { 234 return err 235 } 236 for _, sb := range stat.HaveMap { 237 if err := fn(sb); err != nil { 238 return err 239 } 240 } 241 return nil 242} 243 244// Figure out the size of the contents. 245// If the size was provided, trust it. 246func (h *UploadHandle) readerAndSize() (io.Reader, int64, error) { 247 if h.Size > 0 { 248 return h.Contents, int64(h.Size), nil 249 } 250 var b bytes.Buffer 251 n, err := io.Copy(&b, h.Contents) 252 if err != nil { 253 return nil, 0, err 254 } 255 return &b, n, nil 256} 257 258// Upload uploads a blob, as described by the provided UploadHandle parameters. 259func (c *Client) Upload(ctx context.Context, h *UploadHandle) (*PutResult, error) { 260 errorf := func(msg string, arg ...interface{}) (*PutResult, error) { 261 err := fmt.Errorf(msg, arg...) 262 c.printf("%v", err) 263 return nil, err 264 } 265 266 bodyReader, bodySize, err := h.readerAndSize() 267 if err != nil { 268 return nil, fmt.Errorf("client: error slurping upload handle to find its length: %v", err) 269 } 270 if bodySize > constants.MaxBlobSize { 271 return nil, errors.New("client: body is bigger then max blob size") 272 } 273 274 c.statsMutex.Lock() 275 c.stats.UploadRequests.Blobs++ 276 c.stats.UploadRequests.Bytes += bodySize 277 c.statsMutex.Unlock() 278 279 pr := &PutResult{BlobRef: h.BlobRef, Size: uint32(bodySize)} 280 281 if c.sto != nil { 282 // TODO: stat first so we can show skipped? 283 _, err := blobserver.Receive(ctx, c.sto, h.BlobRef, bodyReader) 284 if err != nil { 285 return nil, err 286 } 287 return pr, nil 288 } 289 290 if !h.Vivify { 291 if _, ok := c.haveCache.StatBlobCache(h.BlobRef); ok { 292 pr.Skipped = true 293 return pr, nil 294 } 295 } 296 297 blobrefStr := h.BlobRef.String() 298 299 // Pre-upload. Check whether the blob already exists on the 300 // server and if not, the URL to upload it to. 301 pfx, err := c.prefix() 302 if err != nil { 303 return nil, err 304 } 305 306 if !h.SkipStat { 307 url_ := fmt.Sprintf("%s/camli/stat", pfx) 308 req := c.newRequest(ctx, "POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr)) 309 req.Header.Add("Content-Type", "application/x-www-form-urlencoded") 310 311 resp, err := c.doReqGated(req) 312 if err != nil { 313 return errorf("stat http error: %v", err) 314 } 315 defer resp.Body.Close() 316 317 if resp.StatusCode != 200 { 318 return errorf("stat response had http status %d", resp.StatusCode) 319 } 320 321 stat, err := parseStatResponse(resp) 322 if err != nil { 323 return nil, err 324 } 325 for _, sbr := range stat.HaveMap { 326 c.haveCache.NoteBlobExists(sbr.Ref, uint32(sbr.Size)) 327 } 328 _, serverHasIt := stat.HaveMap[blobrefStr] 329 if env.DebugUploads() { 330 log.Printf("HTTP Stat(%s) = %v", blobrefStr, serverHasIt) 331 } 332 if !h.Vivify && serverHasIt { 333 pr.Skipped = true 334 if closer, ok := h.Contents.(io.Closer); ok { 335 // TODO(bradfitz): I did this 336 // Close-if-possible thing early on, before I 337 // knew better. Fix the callers instead, and 338 // fix the docs. 339 closer.Close() 340 } 341 c.haveCache.NoteBlobExists(h.BlobRef, uint32(bodySize)) 342 return pr, nil 343 } 344 } 345 346 if env.DebugUploads() { 347 log.Printf("Uploading: %s (%d bytes)", blobrefStr, bodySize) 348 } 349 350 pipeReader, pipeWriter := io.Pipe() 351 multipartWriter := multipart.NewWriter(pipeWriter) 352 353 copyResult := make(chan error, 1) 354 go func() { 355 defer pipeWriter.Close() 356 part, err := multipartWriter.CreateFormFile(blobrefStr, blobrefStr) 357 if err != nil { 358 copyResult <- err 359 return 360 } 361 _, err = io.Copy(part, bodyReader) 362 if err == nil { 363 err = multipartWriter.Close() 364 } 365 copyResult <- err 366 }() 367 368 uploadURL := fmt.Sprintf("%s/camli/upload", pfx) 369 req := c.newRequest(ctx, "POST", uploadURL) 370 req.Header.Set("Content-Type", multipartWriter.FormDataContentType()) 371 if h.Vivify { 372 req.Header.Add("X-Camlistore-Vivify", "1") 373 } 374 req.Body = ioutil.NopCloser(pipeReader) 375 req.ContentLength = getMultipartOverhead() + bodySize + int64(len(blobrefStr))*2 376 resp, err := c.doReqGated(req) 377 if err != nil { 378 return errorf("upload http error: %v", err) 379 } 380 defer resp.Body.Close() 381 382 // check error from earlier copy 383 if err := <-copyResult; err != nil { 384 return errorf("failed to copy contents into multipart writer: %v", err) 385 } 386 387 // The only valid HTTP responses are 200 and 303. 388 if resp.StatusCode != 200 && resp.StatusCode != 303 { 389 return errorf("invalid http response %d in upload response", resp.StatusCode) 390 } 391 392 if resp.StatusCode == 303 { 393 otherLocation := resp.Header.Get("Location") 394 if otherLocation == "" { 395 return errorf("303 without a Location") 396 } 397 baseURL, _ := url.Parse(uploadURL) 398 absURL, err := baseURL.Parse(otherLocation) 399 if err != nil { 400 return errorf("303 Location URL relative resolve error: %v", err) 401 } 402 otherLocation = absURL.String() 403 resp, err = http.Get(otherLocation) 404 if err != nil { 405 return errorf("error following 303 redirect after upload: %v", err) 406 } 407 } 408 409 var ures protocol.UploadResponse 410 if err := httputil.DecodeJSON(resp, &ures); err != nil { 411 return errorf("error in upload response: %v", err) 412 } 413 414 if ures.ErrorText != "" { 415 c.printf("Blob server reports error: %s", ures.ErrorText) 416 } 417 418 expectedSize := uint32(bodySize) 419 420 for _, sb := range ures.Received { 421 if sb.Ref != h.BlobRef { 422 continue 423 } 424 if sb.Size != expectedSize { 425 return errorf("Server got blob %v, but reports wrong length (%v; we sent %d)", 426 sb.Ref, sb.Size, expectedSize) 427 } 428 c.statsMutex.Lock() 429 c.stats.Uploads.Blobs++ 430 c.stats.Uploads.Bytes += bodySize 431 c.statsMutex.Unlock() 432 if pr.Size <= 0 { 433 pr.Size = sb.Size 434 } 435 c.haveCache.NoteBlobExists(pr.BlobRef, pr.Size) 436 return pr, nil 437 } 438 439 return nil, errors.New("server didn't receive blob") 440} 441 442// FileUploadOptions is optionally provided to UploadFile. 443type FileUploadOptions struct { 444 // FileInfo optionally specifies the FileInfo to populate the schema of the file blob. 445 FileInfo os.FileInfo 446 // WholeRef optionally specifies the digest of the uploaded file contents, which 447 // allows UploadFile to skip computing the digest (needed to check if the contents 448 // are already on the server). 449 WholeRef blob.Ref 450} 451 452// UploadFile uploads the contents of the file, as well as a file blob with 453// filename for these contents. If the contents or the file blob are found on 454// the server, they're not uploaded. 455// 456// Note: this method is still a work in progress, and might change to accommodate 457// the needs of pk-put file. 458func (c *Client) UploadFile(ctx context.Context, filename string, contents io.Reader, opts *FileUploadOptions) (blob.Ref, error) { 459 fileMap := schema.NewFileMap(filename) 460 if opts != nil && opts.FileInfo != nil { 461 fileMap = schema.NewCommonFileMap(filename, opts.FileInfo) 462 modTime := opts.FileInfo.ModTime() 463 if !modTime.IsZero() { 464 fileMap.SetModTime(modTime) 465 } 466 } 467 fileMap.SetType("file") 468 469 var wholeRef []blob.Ref 470 if opts != nil && opts.WholeRef.Valid() { 471 wholeRef = append(wholeRef, opts.WholeRef) 472 } else { 473 var buf bytes.Buffer 474 var err error 475 wholeRef, err = c.wholeRef(io.TeeReader(contents, &buf)) 476 if err != nil { 477 return blob.Ref{}, err 478 } 479 contents = io.MultiReader(&buf, contents) 480 } 481 482 fileRef, err := c.fileMapFromDuplicate(ctx, fileMap, wholeRef) 483 if err != nil { 484 return blob.Ref{}, err 485 } 486 if fileRef.Valid() { 487 return fileRef, nil 488 } 489 490 return schema.WriteFileMap(ctx, c, fileMap, contents) 491} 492 493// TODO(mpl): replace up.wholeFileDigest in pk-put with c.wholeRef maybe. 494 495// wholeRef returns the blob ref(s) of the regular file's contents 496// as if it were one entire blob (ignoring blob size limits). 497// By default, only one ref is returned, unless the server has advertised 498// that it has indexes calculated for other hash functions. 499func (c *Client) wholeRef(contents io.Reader) ([]blob.Ref, error) { 500 hasLegacySHA1, err := c.HasLegacySHA1() 501 if err != nil { 502 return nil, fmt.Errorf("cannot discover if server has legacy sha1: %v", err) 503 } 504 td := hashutil.NewTrackDigestReader(contents) 505 td.DoLegacySHA1 = hasLegacySHA1 506 if _, err := io.Copy(ioutil.Discard, td); err != nil { 507 return nil, err 508 } 509 refs := []blob.Ref{blob.RefFromHash(td.Hash())} 510 if td.DoLegacySHA1 { 511 refs = append(refs, blob.RefFromHash(td.LegacySHA1Hash())) 512 } 513 return refs, nil 514} 515 516// fileMapFromDuplicate queries the server's search interface for an 517// existing file blob for the file contents any of wholeRef. 518// If the server has it, it's validated, and then fileMap (which must 519// already be partially populated) has its "parts" field populated, 520// and then fileMap is uploaded (if necessary). 521// If no file blob is found, a zero blob.Ref (and no error) is returned. 522func (c *Client) fileMapFromDuplicate(ctx context.Context, fileMap *schema.Builder, wholeRef []blob.Ref) (blob.Ref, error) { 523 dupFileRef, err := c.SearchExistingFileSchema(ctx, wholeRef...) 524 if err != nil { 525 return blob.Ref{}, err 526 } 527 if !dupFileRef.Valid() { 528 // because SearchExistingFileSchema returns blob.Ref{}, nil when file is not found. 529 return blob.Ref{}, nil 530 } 531 dupMap, err := c.FetchSchemaBlob(ctx, dupFileRef) 532 if err != nil { 533 return blob.Ref{}, fmt.Errorf("could not find existing file blob for wholeRef %q: %v", wholeRef, err) 534 } 535 fileMap.PopulateParts(dupMap.PartsSize(), dupMap.ByteParts()) 536 json, err := fileMap.JSON() 537 if err != nil { 538 return blob.Ref{}, fmt.Errorf("could not write file map for wholeRef %q: %v", wholeRef, err) 539 } 540 bref := blob.RefFromString(json) 541 if bref == dupFileRef { 542 // Unchanged (same filename, modtime, JSON serialization, etc) 543 // Different signer (e.g. existing file has a sha1 signer, and 544 // we're now using a sha224 signer) means we upload a new file schema. 545 return dupFileRef, nil 546 } 547 sbr, err := c.ReceiveBlob(ctx, bref, strings.NewReader(json)) 548 if err != nil { 549 return blob.Ref{}, err 550 } 551 return sbr.Ref, nil 552} 553