1// 2// Copyright (c) 2018, Joyent, Inc. All rights reserved. 3// 4// This Source Code Form is subject to the terms of the Mozilla Public 5// License, v. 2.0. If a copy of the MPL was not distributed with this 6// file, You can obtain one at http://mozilla.org/MPL/2.0/. 7// 8 9package main 10 11import ( 12 "bufio" 13 "context" 14 "fmt" 15 "io/ioutil" 16 "log" 17 "os" 18 "time" 19 20 "encoding/pem" 21 22 triton "github.com/joyent/triton-go" 23 "github.com/joyent/triton-go/authentication" 24 "github.com/joyent/triton-go/storage" 25) 26 27func main() { 28 keyID := os.Getenv("TRITON_KEY_ID") 29 accountName := os.Getenv("TRITON_ACCOUNT") 30 keyMaterial := os.Getenv("TRITON_KEY_MATERIAL") 31 userName := os.Getenv("TRITON_USER") 32 33 var signer authentication.Signer 34 var err error 35 36 if keyMaterial == "" { 37 input := authentication.SSHAgentSignerInput{ 38 KeyID: keyID, 39 AccountName: accountName, 40 Username: userName, 41 } 42 signer, err = authentication.NewSSHAgentSigner(input) 43 if err != nil { 44 log.Fatalf("Error Creating SSH Agent Signer: %v", err) 45 } 46 } else { 47 var keyBytes []byte 48 if _, err = os.Stat(keyMaterial); err == nil { 49 keyBytes, err = ioutil.ReadFile(keyMaterial) 50 if err != nil { 51 log.Fatalf("Error reading key material from %s: %s", 52 keyMaterial, err) 53 } 54 block, _ := pem.Decode(keyBytes) 55 if block == nil { 56 log.Fatalf( 57 "Failed to read key material '%s': no key found", keyMaterial) 58 } 59 60 if block.Headers["Proc-Type"] == "4,ENCRYPTED" { 61 log.Fatalf( 62 "Failed to read key '%s': password protected keys are\n"+ 63 "not currently supported. Please decrypt the key prior to use.", keyMaterial) 64 } 65 66 } else { 67 keyBytes = []byte(keyMaterial) 68 } 69 70 input := authentication.PrivateKeySignerInput{ 71 KeyID: keyID, 72 PrivateKeyMaterial: keyBytes, 73 AccountName: accountName, 74 Username: userName, 75 } 76 signer, err = authentication.NewPrivateKeySigner(input) 77 if err != nil { 78 log.Fatalf("Error Creating SSH Private Key Signer: %v", err) 79 } 80 } 81 82 config := &triton.ClientConfig{ 83 MantaURL: os.Getenv("TRITON_URL"), 84 AccountName: accountName, 85 Username: userName, 86 Signers: []authentication.Signer{signer}, 87 } 88 89 client, err := storage.NewClient(config) 90 if err != nil { 91 log.Fatalf("NewClient: %v", err) 92 } 93 94 job, err := client.Jobs().Create(context.Background(), &storage.CreateJobInput{ 95 Name: "WordCount", 96 Phases: []*storage.JobPhase{ 97 { 98 Type: "map", 99 Exec: "wc", 100 }, 101 { 102 Type: "reduce", 103 Exec: "awk '{ l += $1; w += $2; c += $3 } END { print l, w, c }'", 104 }, 105 }, 106 }) 107 if err != nil { 108 log.Fatalf("CreateJob: %v", err) 109 } 110 111 fmt.Printf("Job ID: %s\n", job.JobID) 112 113 err = client.Jobs().AddInputs(context.Background(), &storage.AddJobInputsInput{ 114 JobID: job.JobID, 115 ObjectPaths: []string{ 116 fmt.Sprintf("/%s/stor/books/treasure_island.txt", accountName), 117 fmt.Sprintf("/%s/stor/books/moby_dick.txt", accountName), 118 fmt.Sprintf("/%s/stor/books/huck_finn.txt", accountName), 119 fmt.Sprintf("/%s/stor/books/dracula.txt", accountName), 120 }, 121 }) 122 if err != nil { 123 log.Fatalf("AddJobInputs: %v", err) 124 } 125 126 err = client.Jobs().AddInputs(context.Background(), &storage.AddJobInputsInput{ 127 JobID: job.JobID, 128 ObjectPaths: []string{ 129 fmt.Sprintf("/%s/stor/books/sherlock_holmes.txt", accountName), 130 }, 131 }) 132 if err != nil { 133 log.Fatalf("AddJobInputs: %v", err) 134 } 135 136 gjo, err := client.Jobs().Get(context.Background(), &storage.GetJobInput{ 137 JobID: job.JobID, 138 }) 139 if err != nil { 140 log.Fatalf("GetJob: %v", err) 141 } 142 143 fmt.Printf("%+v\n", gjo.Job) 144 fmt.Printf("%+v\n", gjo.Job.Stats) 145 146 err = client.Jobs().EndInput(context.Background(), &storage.EndJobInputInput{ 147 JobID: job.JobID, 148 }) 149 if err != nil { 150 log.Fatalf("EndJobInput: %v", err) 151 } 152 153 jobs, err := client.Jobs().List(context.Background(), &storage.ListJobsInput{}) 154 if err != nil { 155 log.Fatalf("ListJobs: %v", err) 156 } 157 158 fmt.Printf("Number of jobs: %d\n", jobs.ResultSetSize) 159 for _, j := range jobs.Jobs { 160 fmt.Printf(" - %s\n", j.ID) 161 } 162 163 gjio, err := client.Jobs().GetInput(context.Background(), &storage.GetJobInputInput{ 164 JobID: job.JobID, 165 }) 166 if err != nil { 167 log.Fatalf("GetJobInput: %v", err) 168 } 169 defer gjio.Items.Close() 170 171 fmt.Printf("Result set size: %d\n", gjio.ResultSetSize) 172 outputsScanner := bufio.NewScanner(gjio.Items) 173 for outputsScanner.Scan() { 174 fmt.Printf(" - %s\n", outputsScanner.Text()) 175 } 176 177 time.Sleep(10 * time.Second) 178 179 gjoo, err := client.Jobs().GetOutput(context.Background(), &storage.GetJobOutputInput{ 180 JobID: job.JobID, 181 }) 182 if err != nil { 183 log.Fatalf("GetJobOutput: %v", err) 184 } 185 defer gjoo.Items.Close() 186 187 fmt.Printf("Result set size: %d\n", gjoo.ResultSetSize) 188 outputsScanner = bufio.NewScanner(gjoo.Items) 189 for outputsScanner.Scan() { 190 fmt.Printf(" - %s\n", outputsScanner.Text()) 191 } 192} 193