1//
2// Copyright (c) 2018, Joyent, Inc. All rights reserved.
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at http://mozilla.org/MPL/2.0/.
7//
8
9package main
10
11import (
12	"bufio"
13	"context"
14	"fmt"
15	"io/ioutil"
16	"log"
17	"os"
18	"time"
19
20	"encoding/pem"
21
22	triton "github.com/joyent/triton-go"
23	"github.com/joyent/triton-go/authentication"
24	"github.com/joyent/triton-go/storage"
25)
26
27func main() {
28	keyID := os.Getenv("TRITON_KEY_ID")
29	accountName := os.Getenv("TRITON_ACCOUNT")
30	keyMaterial := os.Getenv("TRITON_KEY_MATERIAL")
31	userName := os.Getenv("TRITON_USER")
32
33	var signer authentication.Signer
34	var err error
35
36	if keyMaterial == "" {
37		input := authentication.SSHAgentSignerInput{
38			KeyID:       keyID,
39			AccountName: accountName,
40			Username:    userName,
41		}
42		signer, err = authentication.NewSSHAgentSigner(input)
43		if err != nil {
44			log.Fatalf("Error Creating SSH Agent Signer: %v", err)
45		}
46	} else {
47		var keyBytes []byte
48		if _, err = os.Stat(keyMaterial); err == nil {
49			keyBytes, err = ioutil.ReadFile(keyMaterial)
50			if err != nil {
51				log.Fatalf("Error reading key material from %s: %s",
52					keyMaterial, err)
53			}
54			block, _ := pem.Decode(keyBytes)
55			if block == nil {
56				log.Fatalf(
57					"Failed to read key material '%s': no key found", keyMaterial)
58			}
59
60			if block.Headers["Proc-Type"] == "4,ENCRYPTED" {
61				log.Fatalf(
62					"Failed to read key '%s': password protected keys are\n"+
63						"not currently supported. Please decrypt the key prior to use.", keyMaterial)
64			}
65
66		} else {
67			keyBytes = []byte(keyMaterial)
68		}
69
70		input := authentication.PrivateKeySignerInput{
71			KeyID:              keyID,
72			PrivateKeyMaterial: keyBytes,
73			AccountName:        accountName,
74			Username:           userName,
75		}
76		signer, err = authentication.NewPrivateKeySigner(input)
77		if err != nil {
78			log.Fatalf("Error Creating SSH Private Key Signer: %v", err)
79		}
80	}
81
82	config := &triton.ClientConfig{
83		MantaURL:    os.Getenv("TRITON_URL"),
84		AccountName: accountName,
85		Username:    userName,
86		Signers:     []authentication.Signer{signer},
87	}
88
89	client, err := storage.NewClient(config)
90	if err != nil {
91		log.Fatalf("NewClient: %v", err)
92	}
93
94	job, err := client.Jobs().Create(context.Background(), &storage.CreateJobInput{
95		Name: "WordCount",
96		Phases: []*storage.JobPhase{
97			{
98				Type: "map",
99				Exec: "wc",
100			},
101			{
102				Type: "reduce",
103				Exec: "awk '{ l += $1; w += $2; c += $3 } END { print l, w, c }'",
104			},
105		},
106	})
107	if err != nil {
108		log.Fatalf("CreateJob: %v", err)
109	}
110
111	fmt.Printf("Job ID: %s\n", job.JobID)
112
113	err = client.Jobs().AddInputs(context.Background(), &storage.AddJobInputsInput{
114		JobID: job.JobID,
115		ObjectPaths: []string{
116			fmt.Sprintf("/%s/stor/books/treasure_island.txt", accountName),
117			fmt.Sprintf("/%s/stor/books/moby_dick.txt", accountName),
118			fmt.Sprintf("/%s/stor/books/huck_finn.txt", accountName),
119			fmt.Sprintf("/%s/stor/books/dracula.txt", accountName),
120		},
121	})
122	if err != nil {
123		log.Fatalf("AddJobInputs: %v", err)
124	}
125
126	err = client.Jobs().AddInputs(context.Background(), &storage.AddJobInputsInput{
127		JobID: job.JobID,
128		ObjectPaths: []string{
129			fmt.Sprintf("/%s/stor/books/sherlock_holmes.txt", accountName),
130		},
131	})
132	if err != nil {
133		log.Fatalf("AddJobInputs: %v", err)
134	}
135
136	gjo, err := client.Jobs().Get(context.Background(), &storage.GetJobInput{
137		JobID: job.JobID,
138	})
139	if err != nil {
140		log.Fatalf("GetJob: %v", err)
141	}
142
143	fmt.Printf("%+v\n", gjo.Job)
144	fmt.Printf("%+v\n", gjo.Job.Stats)
145
146	err = client.Jobs().EndInput(context.Background(), &storage.EndJobInputInput{
147		JobID: job.JobID,
148	})
149	if err != nil {
150		log.Fatalf("EndJobInput: %v", err)
151	}
152
153	jobs, err := client.Jobs().List(context.Background(), &storage.ListJobsInput{})
154	if err != nil {
155		log.Fatalf("ListJobs: %v", err)
156	}
157
158	fmt.Printf("Number of jobs: %d\n", jobs.ResultSetSize)
159	for _, j := range jobs.Jobs {
160		fmt.Printf(" - %s\n", j.ID)
161	}
162
163	gjio, err := client.Jobs().GetInput(context.Background(), &storage.GetJobInputInput{
164		JobID: job.JobID,
165	})
166	if err != nil {
167		log.Fatalf("GetJobInput: %v", err)
168	}
169	defer gjio.Items.Close()
170
171	fmt.Printf("Result set size: %d\n", gjio.ResultSetSize)
172	outputsScanner := bufio.NewScanner(gjio.Items)
173	for outputsScanner.Scan() {
174		fmt.Printf(" - %s\n", outputsScanner.Text())
175	}
176
177	time.Sleep(10 * time.Second)
178
179	gjoo, err := client.Jobs().GetOutput(context.Background(), &storage.GetJobOutputInput{
180		JobID: job.JobID,
181	})
182	if err != nil {
183		log.Fatalf("GetJobOutput: %v", err)
184	}
185	defer gjoo.Items.Close()
186
187	fmt.Printf("Result set size: %d\n", gjoo.ResultSetSize)
188	outputsScanner = bufio.NewScanner(gjoo.Items)
189	for outputsScanner.Scan() {
190		fmt.Printf(" - %s\n", outputsScanner.Text())
191	}
192}
193