Example Logsink Client

The other client that I’ve written recently is one that can connect to our ‘/logsink’ api, which streams json formatted messages into a websocket connection. Its a bit more complex than my other api client, because the streamConnector in our Juju code is all private methods. The actual ‘main()’ function is pretty tight.

I could see turning something like this into a CLI utility that would let you just cat lines into stdin and have it send those as messages to the Juju log interface. The one caveat is that the actual interface is a structured LogRecord with things like timestamp, loge level, etc as real information, rather than just part of a string.

Also, this particular client intentionally misbehaves on the connection, because I was trying to get the controller to misbehave based on a poorly behaving client. But maybe it can be a starting point for someone else:

package main

import (
	"bufio"
	"crypto/tls"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"math/rand"
	"net"
	"net/http"
	"net/url"
	"os"
	"strings"
	"time"

	"github.com/gorilla/websocket"
	"github.com/juju/errors"
	"github.com/juju/gnuflag"
	"github.com/juju/utils"

	"github.com/juju/juju/api/base"
	"github.com/juju/juju/api/logsender"
	"github.com/juju/juju/apiserver/params"
)

var agent = gnuflag.String("agent", "machine-0", "set the agent name that we will connect as")
var nonce = gnuflag.String("nonce", "user-admin:bootstrap", "machine agents need to pass the machine nonce")
var password = gnuflag.String("password", "", "the password for this agent")
var host = gnuflag.String("host", "localhost", "the host to connect to")
var port = gnuflag.Int("port", 17070, "the apiserver port")
var uuid = gnuflag.String("uuid", "", "model-uuid to connect to")

type streamConnector struct {
	modelUUID string
	host      string
	port      int
	agent     string
	password  string
	nonce     string
}

func (s streamConnector) apiPath(path string) string {
	return fmt.Sprintf("/model/%s%s", s.modelUUID, path)
}

func (s streamConnector) Addr() string {
	return net.JoinHostPort(s.host, fmt.Sprint(s.port))
}

func (s streamConnector) tlsConfig() *tls.Config {
	// Start with good cipher suites and versions
	baseTLS := utils.SecureTLSConfig()
	baseTLS.InsecureSkipVerify = true
	baseTLS.ServerName = "juju-apiserver"
	return baseTLS
}

const bufSize = 65536

func (s *streamConnector) ConnectStream(path string, attrs url.Values) (base.Stream, error) {
	target := url.URL{
		Scheme:   "wss",
		Host:     s.Addr(),
		Path:     s.apiPath(path),
		RawQuery: attrs.Encode(),
	}
	dialer := &websocket.Dialer{
		Proxy:           nil,
		TLSClientConfig: s.tlsConfig(),
		ReadBufferSize:  bufSize,
		WriteBufferSize: bufSize,
	}
	requestHeader := utils.BasicAuthHeader(s.agent, s.password)
	if s.nonce != "" {
		requestHeader.Set("X-Juju-Nonce", s.nonce)
	}
	connection, err := websocketDial(dialer, target.String(), requestHeader)
	if err != nil {
		return nil, errors.Trace(err)
	}
	if err := readInitialStreamError(connection); err != nil {
		connection.Close()
		return nil, errors.Trace(err)
	}
	return connection, nil
}

// readInitialStreamError reads the initial error response
// from a stream connection and returns it.
func readInitialStreamError(ws base.Stream) error {
	// We can use bufio here because the websocket guarantees that a
	// single read will not read more than a single frame; there is
	// no guarantee that a single read might not read less than the
	// whole frame though, so using a single Read call is not
	// correct. By using ReadSlice rather than ReadBytes, we
	// guarantee that the error can't be too big (>4096 bytes).
	messageType, reader, err := ws.NextReader()
	if err != nil {
		return errors.Annotate(err, "unable to get reader")
	}
	if messageType != websocket.TextMessage {
		return errors.Errorf("unexpected message type %v", messageType)
	}
	line, err := bufio.NewReader(reader).ReadSlice('\n')
	if err != nil {
		return errors.Annotate(err, "unable to read initial response")
	}
	var errResult params.ErrorResult
	if err := json.Unmarshal(line, &errResult); err != nil {
		return errors.Annotate(err, "unable to unmarshal initial response")
	}
	if errResult.Error != nil {
		return errResult.Error
	}
	return nil
}

func websocketDial(dialer *websocket.Dialer, urlStr string, requestHeader http.Header) (base.Stream, error) {
	c, resp, err := dialer.Dial(urlStr, requestHeader)
	if err != nil {
		if err == websocket.ErrBadHandshake {
			defer resp.Body.Close()
			body, readErr := ioutil.ReadAll(resp.Body)
			if readErr != nil {
				return nil, err
			}
			err = errors.Errorf("%s (%s)",
				strings.TrimSpace(string(body)),
				http.StatusText(resp.StatusCode),
			)
		}
		return nil, err
	}
	return c, nil
}

const chars = "abcdefghijklmnopqrstuvwxyz"

func main() {
	gnuflag.Parse(true)
	connector := &streamConnector{
		modelUUID: *uuid,
		host:      *host,
		port:      *port,
		agent:     *agent,
		password:  *password,
		nonce:     *nonce,
	}
	sender := logsender.NewAPI(connector)
	writer, err := sender.LogWriter()
	if err != nil {
		panic(err)
	}
	//writer.Close()
	now := time.Now()
	rand.Seed(int64(now.Nanosecond() + os.Getpid()))
	prefix := string(chars[rand.Intn(len(chars))]) + string(chars[rand.Intn(len(chars))])
	// TODO: If we want to handle the Ping/Pong from gorilla websocket, we need something like:
	// go func() {
	//   _, err := websocket.NewReader()
	// }()
	// Because even though we shouldn't ever have messages coming back to
	// us, the Ping/Pong gorilla websocket handler is hidden in the NewReader() call.
	i := 0
	for time.Since(now).Seconds() < 5.0 {
		err = writer.WriteLog(&params.LogRecord{
			Time:     time.Now(),
			Module:   "juju.user",
			Location: "here",
			Level:    "INFO",
			Message:  fmt.Sprintf("%s this is my fight song %d", prefix, i),
			Entity:   *agent,
		})
		if err != nil {
			panic(err)
		}
		i++
	}
	fmt.Printf("wrote %d messages in 5s (%.3f/s)\n", i, float64(i)/5.0)
}