1/*
2Copyright 2011 The Perkeep Authors
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package client
18
19import (
20	"bytes"
21	"context"
22	"errors"
23	"fmt"
24	"io"
25	"io/ioutil"
26	"log"
27	"mime/multipart"
28	"net/http"
29	"net/url"
30	"os"
31	"strings"
32	"sync"
33	"time"
34
35	"perkeep.org/internal/hashutil"
36	"perkeep.org/internal/httputil"
37	"perkeep.org/pkg/blob"
38	"perkeep.org/pkg/blobserver"
39	"perkeep.org/pkg/blobserver/protocol"
40	"perkeep.org/pkg/constants"
41	"perkeep.org/pkg/env"
42	"perkeep.org/pkg/schema"
43)
44
45// UploadHandle contains the parameters is a request to upload a blob.
46type UploadHandle struct {
47	// BlobRef is the required blobref of the blob to upload.
48	BlobRef blob.Ref
49
50	// Contents is the blob data.
51	Contents io.Reader
52
53	// Size optionally specifies the size of Contents.
54	// If <= 0, the Contents are slurped into memory to count the size.
55	Size uint32
56
57	// Vivify optionally instructs the server to create a
58	// permanode for this blob. If used, the blob should be a
59	// "file" schema blob. This is typically used by
60	// lesser-trusted clients (such a mobile phones) which don't
61	// have rights to do signing directly.
62	Vivify bool
63
64	// SkipStat indicates whether the stat check (checking whether
65	// the server already has the blob) will be skipped and the
66	// blob should be uploaded immediately. This is useful for
67	// small blobs that the server is unlikely to already have
68	// (e.g. new claims).
69	SkipStat bool
70}
71
72type PutResult struct {
73	BlobRef blob.Ref
74	Size    uint32
75	Skipped bool // already present on blobserver
76}
77
78func (pr *PutResult) SizedBlobRef() blob.SizedRef {
79	return blob.SizedRef{pr.BlobRef, pr.Size}
80}
81
82// TODO: ditch this type and use protocol.StatResponse directly?
83// Or at least make HaveMap keyed by a blob.Ref instead of a string.
84type statResponse struct {
85	HaveMap     map[string]blob.SizedRef
86	canLongPoll bool
87}
88
89type ResponseFormatError error
90
91var (
92	multipartOnce     sync.Once
93	multipartOverhead int64
94)
95
96// multipartOverhead is how many extra bytes mime/multipart's
97// Writer adds around content
98func getMultipartOverhead() int64 {
99	multipartOnce.Do(func() {
100		var b bytes.Buffer
101		w := multipart.NewWriter(&b)
102		part, _ := w.CreateFormFile("0", "0")
103
104		dummyContents := []byte("0")
105		part.Write(dummyContents)
106
107		w.Close()
108		multipartOverhead = int64(b.Len()) - 3 // remove what was added
109	})
110	return multipartOverhead
111}
112
113func parseStatResponse(res *http.Response) (*statResponse, error) {
114	var s = &statResponse{HaveMap: make(map[string]blob.SizedRef)}
115	var pres protocol.StatResponse
116	if err := httputil.DecodeJSON(res, &pres); err != nil {
117		return nil, ResponseFormatError(err)
118	}
119
120	s.canLongPoll = pres.CanLongPoll
121	for _, statItem := range pres.Stat {
122		br := statItem.Ref
123		if !br.Valid() {
124			continue
125		}
126		s.HaveMap[br.String()] = blob.SizedRef{br, uint32(statItem.Size)}
127	}
128	return s, nil
129}
130
131// NewUploadHandleFromString returns an upload handle
132func NewUploadHandleFromString(data string) *UploadHandle {
133	bref := blob.RefFromString(data)
134	r := strings.NewReader(data)
135	return &UploadHandle{BlobRef: bref, Size: uint32(len(data)), Contents: r}
136}
137
138// TODO(bradfitz): delete most of this. use new camlistore.org/pkg/blobserver/protocol types instead
139// of a map[string]interface{}.
140func (c *Client) responseJSONMap(requestName string, resp *http.Response) (map[string]interface{}, error) {
141	if resp.StatusCode != 200 {
142		c.printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode)
143		io.Copy(os.Stderr, resp.Body)
144		return nil, fmt.Errorf("after %s request, HTTP response code is %d; no JSON to parse", requestName, resp.StatusCode)
145	}
146	jmap := make(map[string]interface{})
147	if err := httputil.DecodeJSON(resp, &jmap); err != nil {
148		return nil, err
149	}
150	return jmap, nil
151}
152
153// statReq is a request to stat a blob.
154type statReq struct {
155	br   blob.Ref
156	dest chan<- blob.SizedRef // written to on success
157	errc chan<- error         // written to on both failure and success (after any dest)
158}
159
160func (c *Client) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
161	if c.sto != nil {
162		return c.sto.StatBlobs(ctx, blobs, fn)
163	}
164	var needStat []blob.Ref
165	for _, br := range blobs {
166		if !br.Valid() {
167			panic("invalid blob")
168		}
169		if size, ok := c.haveCache.StatBlobCache(br); ok {
170			if err := fn(blob.SizedRef{br, size}); err != nil {
171				return err
172			}
173		} else {
174			needStat = append(needStat, br)
175		}
176	}
177	if len(needStat) == 0 {
178		return nil
179	}
180	return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, c.httpGate, func(br blob.Ref) (workerSB blob.SizedRef, err error) {
181		err = c.doStat(ctx, []blob.Ref{br}, 0, false, func(sb blob.SizedRef) error {
182			workerSB = sb
183			c.haveCache.NoteBlobExists(sb.Ref, sb.Size)
184			return fn(sb)
185		})
186		return
187	})
188}
189
190// doStat does an HTTP request for the stat. the number of blobs is used verbatim. No extra splitting
191// or batching is done at this layer.
192// The semantics are the same as blobserver.BlobStatter.
193// gate controls whether it uses httpGate to pause on requests.
194func (c *Client) doStat(ctx context.Context, blobs []blob.Ref, wait time.Duration, gated bool, fn func(blob.SizedRef) error) error {
195	var buf bytes.Buffer
196	fmt.Fprintf(&buf, "camliversion=1")
197	if wait > 0 {
198		secs := int(wait.Seconds())
199		if secs == 0 {
200			secs = 1
201		}
202		fmt.Fprintf(&buf, "&maxwaitsec=%d", secs)
203	}
204	for i, blob := range blobs {
205		fmt.Fprintf(&buf, "&blob%d=%s", i+1, blob)
206	}
207
208	pfx, err := c.prefix()
209	if err != nil {
210		return err
211	}
212	req := c.newRequest(ctx, "POST", fmt.Sprintf("%s/camli/stat", pfx), &buf)
213	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
214
215	var resp *http.Response
216	if gated {
217		resp, err = c.doReqGated(req)
218	} else {
219		resp, err = c.httpClient.Do(req)
220	}
221	if err != nil {
222		return fmt.Errorf("stat HTTP error: %v", err)
223	}
224	if resp.Body != nil {
225		defer resp.Body.Close()
226	}
227
228	if resp.StatusCode != 200 {
229		return fmt.Errorf("stat response had http status %d", resp.StatusCode)
230	}
231
232	stat, err := parseStatResponse(resp)
233	if err != nil {
234		return err
235	}
236	for _, sb := range stat.HaveMap {
237		if err := fn(sb); err != nil {
238			return err
239		}
240	}
241	return nil
242}
243
244// Figure out the size of the contents.
245// If the size was provided, trust it.
246func (h *UploadHandle) readerAndSize() (io.Reader, int64, error) {
247	if h.Size > 0 {
248		return h.Contents, int64(h.Size), nil
249	}
250	var b bytes.Buffer
251	n, err := io.Copy(&b, h.Contents)
252	if err != nil {
253		return nil, 0, err
254	}
255	return &b, n, nil
256}
257
258// Upload uploads a blob, as described by the provided UploadHandle parameters.
259func (c *Client) Upload(ctx context.Context, h *UploadHandle) (*PutResult, error) {
260	errorf := func(msg string, arg ...interface{}) (*PutResult, error) {
261		err := fmt.Errorf(msg, arg...)
262		c.printf("%v", err)
263		return nil, err
264	}
265
266	bodyReader, bodySize, err := h.readerAndSize()
267	if err != nil {
268		return nil, fmt.Errorf("client: error slurping upload handle to find its length: %v", err)
269	}
270	if bodySize > constants.MaxBlobSize {
271		return nil, errors.New("client: body is bigger then max blob size")
272	}
273
274	c.statsMutex.Lock()
275	c.stats.UploadRequests.Blobs++
276	c.stats.UploadRequests.Bytes += bodySize
277	c.statsMutex.Unlock()
278
279	pr := &PutResult{BlobRef: h.BlobRef, Size: uint32(bodySize)}
280
281	if c.sto != nil {
282		// TODO: stat first so we can show skipped?
283		_, err := blobserver.Receive(ctx, c.sto, h.BlobRef, bodyReader)
284		if err != nil {
285			return nil, err
286		}
287		return pr, nil
288	}
289
290	if !h.Vivify {
291		if _, ok := c.haveCache.StatBlobCache(h.BlobRef); ok {
292			pr.Skipped = true
293			return pr, nil
294		}
295	}
296
297	blobrefStr := h.BlobRef.String()
298
299	// Pre-upload. Check whether the blob already exists on the
300	// server and if not, the URL to upload it to.
301	pfx, err := c.prefix()
302	if err != nil {
303		return nil, err
304	}
305
306	if !h.SkipStat {
307		url_ := fmt.Sprintf("%s/camli/stat", pfx)
308		req := c.newRequest(ctx, "POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr))
309		req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
310
311		resp, err := c.doReqGated(req)
312		if err != nil {
313			return errorf("stat http error: %v", err)
314		}
315		defer resp.Body.Close()
316
317		if resp.StatusCode != 200 {
318			return errorf("stat response had http status %d", resp.StatusCode)
319		}
320
321		stat, err := parseStatResponse(resp)
322		if err != nil {
323			return nil, err
324		}
325		for _, sbr := range stat.HaveMap {
326			c.haveCache.NoteBlobExists(sbr.Ref, uint32(sbr.Size))
327		}
328		_, serverHasIt := stat.HaveMap[blobrefStr]
329		if env.DebugUploads() {
330			log.Printf("HTTP Stat(%s) = %v", blobrefStr, serverHasIt)
331		}
332		if !h.Vivify && serverHasIt {
333			pr.Skipped = true
334			if closer, ok := h.Contents.(io.Closer); ok {
335				// TODO(bradfitz): I did this
336				// Close-if-possible thing early on, before I
337				// knew better.  Fix the callers instead, and
338				// fix the docs.
339				closer.Close()
340			}
341			c.haveCache.NoteBlobExists(h.BlobRef, uint32(bodySize))
342			return pr, nil
343		}
344	}
345
346	if env.DebugUploads() {
347		log.Printf("Uploading: %s (%d bytes)", blobrefStr, bodySize)
348	}
349
350	pipeReader, pipeWriter := io.Pipe()
351	multipartWriter := multipart.NewWriter(pipeWriter)
352
353	copyResult := make(chan error, 1)
354	go func() {
355		defer pipeWriter.Close()
356		part, err := multipartWriter.CreateFormFile(blobrefStr, blobrefStr)
357		if err != nil {
358			copyResult <- err
359			return
360		}
361		_, err = io.Copy(part, bodyReader)
362		if err == nil {
363			err = multipartWriter.Close()
364		}
365		copyResult <- err
366	}()
367
368	uploadURL := fmt.Sprintf("%s/camli/upload", pfx)
369	req := c.newRequest(ctx, "POST", uploadURL)
370	req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
371	if h.Vivify {
372		req.Header.Add("X-Camlistore-Vivify", "1")
373	}
374	req.Body = ioutil.NopCloser(pipeReader)
375	req.ContentLength = getMultipartOverhead() + bodySize + int64(len(blobrefStr))*2
376	resp, err := c.doReqGated(req)
377	if err != nil {
378		return errorf("upload http error: %v", err)
379	}
380	defer resp.Body.Close()
381
382	// check error from earlier copy
383	if err := <-copyResult; err != nil {
384		return errorf("failed to copy contents into multipart writer: %v", err)
385	}
386
387	// The only valid HTTP responses are 200 and 303.
388	if resp.StatusCode != 200 && resp.StatusCode != 303 {
389		return errorf("invalid http response %d in upload response", resp.StatusCode)
390	}
391
392	if resp.StatusCode == 303 {
393		otherLocation := resp.Header.Get("Location")
394		if otherLocation == "" {
395			return errorf("303 without a Location")
396		}
397		baseURL, _ := url.Parse(uploadURL)
398		absURL, err := baseURL.Parse(otherLocation)
399		if err != nil {
400			return errorf("303 Location URL relative resolve error: %v", err)
401		}
402		otherLocation = absURL.String()
403		resp, err = http.Get(otherLocation)
404		if err != nil {
405			return errorf("error following 303 redirect after upload: %v", err)
406		}
407	}
408
409	var ures protocol.UploadResponse
410	if err := httputil.DecodeJSON(resp, &ures); err != nil {
411		return errorf("error in upload response: %v", err)
412	}
413
414	if ures.ErrorText != "" {
415		c.printf("Blob server reports error: %s", ures.ErrorText)
416	}
417
418	expectedSize := uint32(bodySize)
419
420	for _, sb := range ures.Received {
421		if sb.Ref != h.BlobRef {
422			continue
423		}
424		if sb.Size != expectedSize {
425			return errorf("Server got blob %v, but reports wrong length (%v; we sent %d)",
426				sb.Ref, sb.Size, expectedSize)
427		}
428		c.statsMutex.Lock()
429		c.stats.Uploads.Blobs++
430		c.stats.Uploads.Bytes += bodySize
431		c.statsMutex.Unlock()
432		if pr.Size <= 0 {
433			pr.Size = sb.Size
434		}
435		c.haveCache.NoteBlobExists(pr.BlobRef, pr.Size)
436		return pr, nil
437	}
438
439	return nil, errors.New("server didn't receive blob")
440}
441
442// FileUploadOptions is optionally provided to UploadFile.
443type FileUploadOptions struct {
444	// FileInfo optionally specifies the FileInfo to populate the schema of the file blob.
445	FileInfo os.FileInfo
446	// WholeRef optionally specifies the digest of the uploaded file contents, which
447	// allows UploadFile to skip computing the digest (needed to check if the contents
448	// are already on the server).
449	WholeRef blob.Ref
450}
451
452// UploadFile uploads the contents of the file, as well as a file blob with
453// filename for these contents. If the contents or the file blob are found on
454// the server, they're not uploaded.
455//
456// Note: this method is still a work in progress, and might change to accommodate
457// the needs of pk-put file.
458func (c *Client) UploadFile(ctx context.Context, filename string, contents io.Reader, opts *FileUploadOptions) (blob.Ref, error) {
459	fileMap := schema.NewFileMap(filename)
460	if opts != nil && opts.FileInfo != nil {
461		fileMap = schema.NewCommonFileMap(filename, opts.FileInfo)
462		modTime := opts.FileInfo.ModTime()
463		if !modTime.IsZero() {
464			fileMap.SetModTime(modTime)
465		}
466	}
467	fileMap.SetType("file")
468
469	var wholeRef []blob.Ref
470	if opts != nil && opts.WholeRef.Valid() {
471		wholeRef = append(wholeRef, opts.WholeRef)
472	} else {
473		var buf bytes.Buffer
474		var err error
475		wholeRef, err = c.wholeRef(io.TeeReader(contents, &buf))
476		if err != nil {
477			return blob.Ref{}, err
478		}
479		contents = io.MultiReader(&buf, contents)
480	}
481
482	fileRef, err := c.fileMapFromDuplicate(ctx, fileMap, wholeRef)
483	if err != nil {
484		return blob.Ref{}, err
485	}
486	if fileRef.Valid() {
487		return fileRef, nil
488	}
489
490	return schema.WriteFileMap(ctx, c, fileMap, contents)
491}
492
493// TODO(mpl): replace up.wholeFileDigest in pk-put with c.wholeRef maybe.
494
495// wholeRef returns the blob ref(s) of the regular file's contents
496// as if it were one entire blob (ignoring blob size limits).
497// By default, only one ref is returned, unless the server has advertised
498// that it has indexes calculated for other hash functions.
499func (c *Client) wholeRef(contents io.Reader) ([]blob.Ref, error) {
500	hasLegacySHA1, err := c.HasLegacySHA1()
501	if err != nil {
502		return nil, fmt.Errorf("cannot discover if server has legacy sha1: %v", err)
503	}
504	td := hashutil.NewTrackDigestReader(contents)
505	td.DoLegacySHA1 = hasLegacySHA1
506	if _, err := io.Copy(ioutil.Discard, td); err != nil {
507		return nil, err
508	}
509	refs := []blob.Ref{blob.RefFromHash(td.Hash())}
510	if td.DoLegacySHA1 {
511		refs = append(refs, blob.RefFromHash(td.LegacySHA1Hash()))
512	}
513	return refs, nil
514}
515
516// fileMapFromDuplicate queries the server's search interface for an
517// existing file blob for the file contents any of wholeRef.
518// If the server has it, it's validated, and then fileMap (which must
519// already be partially populated) has its "parts" field populated,
520// and then fileMap is uploaded (if necessary).
521// If no file blob is found, a zero blob.Ref (and no error) is returned.
522func (c *Client) fileMapFromDuplicate(ctx context.Context, fileMap *schema.Builder, wholeRef []blob.Ref) (blob.Ref, error) {
523	dupFileRef, err := c.SearchExistingFileSchema(ctx, wholeRef...)
524	if err != nil {
525		return blob.Ref{}, err
526	}
527	if !dupFileRef.Valid() {
528		// because SearchExistingFileSchema returns blob.Ref{}, nil when file is not found.
529		return blob.Ref{}, nil
530	}
531	dupMap, err := c.FetchSchemaBlob(ctx, dupFileRef)
532	if err != nil {
533		return blob.Ref{}, fmt.Errorf("could not find existing file blob for wholeRef %q: %v", wholeRef, err)
534	}
535	fileMap.PopulateParts(dupMap.PartsSize(), dupMap.ByteParts())
536	json, err := fileMap.JSON()
537	if err != nil {
538		return blob.Ref{}, fmt.Errorf("could not write file map for wholeRef %q: %v", wholeRef, err)
539	}
540	bref := blob.RefFromString(json)
541	if bref == dupFileRef {
542		// Unchanged (same filename, modtime, JSON serialization, etc)
543		// Different signer (e.g. existing file has a sha1 signer, and
544		// we're now using a sha224 signer) means we upload a new file schema.
545		return dupFileRef, nil
546	}
547	sbr, err := c.ReceiveBlob(ctx, bref, strings.NewReader(json))
548	if err != nil {
549		return blob.Ref{}, err
550	}
551	return sbr.Ref, nil
552}
553