1
0

Add smudge dependency to vendor

Smudge provides an implementation of the gossip protocol that will be
used for communicating with other chat client implementations.

Retrieved at: d39c17654b
This commit is contained in:
Tony Grosinger 2017-10-19 19:44:45 -07:00
parent 71159645d0
commit a1e0fb3b7e
Signed by: tgrosinger
GPG Key ID: 065559ACE0A9C69C
24 changed files with 3914 additions and 0 deletions

7
vendor/github.com/clockworksoul/smudge/Dockerfile generated vendored Normal file
View File

@ -0,0 +1,7 @@
FROM scratch
COPY tmp/smudge /smudge
EXPOSE 9999
CMD ["/smudge"]

202
vendor/github.com/clockworksoul/smudge/LICENSE generated vendored Normal file
View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

186
vendor/github.com/clockworksoul/smudge/README.md generated vendored Normal file
View File

@ -0,0 +1,186 @@
# Smudge
[![GoDoc](https://godoc.org/github.com/clockworksoul/smudge?status.svg)](https://godoc.org/github.com/clockworksoul/smudge)
[![Build Status](https://travis-ci.org/clockworksoul/smudge.svg?branch=master)](https://travis-ci.org/clockworksoul/smudge)
[![Go Report Card](https://goreportcard.com/badge/github.com/clockworksoul/smudge)](https://goreportcard.com/report/github.com/clockworksoul/smudge)
<img src="https://github.com/clockworksoul/smudge/raw/master/logo/logo.png" width="150">
## Introduction
Smudge is a minimalist Go implementation of the [SWIM](https://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf) (Scalable Weakly-consistent Infection-style Membership) protocol for cluster node membership, status dissemination, and failure detection developed at Cornell University by Motivala, et al. It isn't a distributed data store in its own right, but rather a framework intended to facilitate the construction of such systems.
Smudge also extends the standard SWIM protocol so that in addition to the standard membership status functionality it also allows the transmission of broadcasts containing a small amount (256 bytes) of arbitrary content to all present healthy members. This maximum is related to the limit imposed on maximum safe UDP packet size by RFC 791 and RFC 2460. We recognize that some systems allow larger packets, however, and although that can risk fragmentation and dropped packets the maximum payload size is configurable.
Smudge was conceived with space-sensitive systems (mobile, IOT, containers) in mind, and therefore was developed with a minimalist philosophy of doing a few things well. As such, its feature set is relatively small and mostly limited to functionality around adding and removing nodes and detecting status changes on the cluster.
Complete documentation is available from [the associated Godoc](https://godoc.org/github.com/clockworksoul/smudge).
## Features
* Uses gossip (i.e., epidemic) protocol for dissemination, the latency of which grows logarithmically with the number of members.
* Low-bandwidth UDP-based failure detection and status dissemination.
* Imposes a constant message load per group member, regardless of the number of members.
* Member status changes are eventually detected by all non-faulty members of the cluster (strong completeness).
* Supports transmission of short (256 byte) broadcasts that are propagated at most once to all present, healthy members.
## Known issues
* Broadcasts are limited to 256 bytes.
* No WAN support: only local-network, private IPs are supported.
* No multicast discovery.
### Deviations from [Motivala, et al](https://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf)
* Dead nodes are not immediately removed, but are instead periodically re-tried (with exponential backoff) for a time before finally being removed.
* Smudge allows the transsion of short, arbitrary-content broadcasts to all healthy nodes.
## How to use
To use the code, you simply specify a few configuration options (or use the defaults), create and add a node status change listener, and call the `smudge.Begin()` function.
### Configuring the node with environment variables
Perhaps the simplest way of directing the behavior of the SWIM driver is by setting the appropriate system environment variables, which is useful when making use of Smudge inside of a container.
The following variables and their default values are as follows:
```
Variable | Default | Description
-------------------------- | ------- | -------------------------------
SMUDGE_HEARTBEAT_MILLIS | 250 | Milliseconds between heartbeats
SMUDGE_INITIAL_HOSTS | | Comma-delimmited list of known members as IP or IP:PORT.
SMUDGE_LISTEN_PORT | 9999 | UDP port to listen on
SMUDGE_MAX_BROADCAST_BYTES | 256 | Maximum byte length of broadcast payloads
```
### Configuring the node with API calls
If you prefer to direct the behavior of the service using the API, the calls are relatively straight-forward. Note that setting the application properties using this method overrides the behavior of environment variables.
```
smudge.SetListenPort(9999)
smudge.SetHeartbeatMillis(250)
smudge.SetMaxBroadcastBytes(256)
```
### Creating and adding a status change listener
Creating a status change listener is very straight-forward:
```
type MyStatusListener struct {
smudge.StatusListener
}
func (m MyStatusListener) OnChange(node *smudge.Node, status smudge.NodeStatus) {
fmt.Printf("Node %s is now status %s\n", node.Address(), status)
}
func main() {
smudge.AddStatusListener(MyStatusListener{})
}
```
### Creating and adding a broadcast listener
Adding a broadcast listener is very similar to creating a status listener:
```
type MyBroadcastListener struct {
smudge.BroadcastListener
}
func (m MyBroadcastListener) OnBroadcast(b *smudge.Broadcast) {
fmt.Printf("Received broadcast from %v: %s\n",
b.Origin().Address(),
string(b.Bytes()))
}
func main() {
smudge.AddBroadcastListener(MyBroadcastListener{})
}
```
### Adding a new member to the "known nodes" list
Adding a new member to your known nodes list will also make that node aware of the adding server. Note that because this package doesn't yet support multicast notifications, at this time to join an existing cluster you must use this method to add at least one of that cluster's healthy member nodes.
```
node, err := smudge.CreateNodeByAddress("localhost:10000")
if err == nil {
smudge.AddNode(node)
}
```
### Starting the server
Once everything else is done, starting the server is trivial:
Simply call: `smudge.Begin()`
### Transmitting a broadcast
To transmit a broadcast to all healthy nodes currenty in the cluster you can use one of the [`BroadcastBytes(bytes []byte)`](https://godoc.org/github.com/clockworksoul/smudge#BroadcastBytes) or [`BroadcastString(str string)`](https://godoc.org/github.com/clockworksoul/smudge#BroadcastString) functions.
Be aware of the following caveats:
* Attempting to send a broadcast before the server has been started will cause a panic.
* The broadcast _will not_ be received by the originating member; `BroadcastListener`s on the originating member will not be triggered.
* Nodes that join the cluster after the broadcast has been fully propagated will not receive the broadcast; nodes that join after the initial transmission but before complete proagation may or may not receive the broadcast.
### Getting a list of nodes
The [`AllNodes()`](https://godoc.org/github.com/clockworksoul/smudge#AllNodes) can be used to get all known nodes; [`HealthyNodes()`](https://godoc.org/github.com/clockworksoul/smudge#HealthyNodes) works similarly, but returns only healthy nodes (defined as nodes with a [status](https://godoc.org/github.com/clockworksoul/smudge#NodeStatus) of "alive").
### Everything in one place
```
package main
import "github.com/clockworksoul/smudge"
import "fmt"
type MyStatusListener struct {
smudge.StatusListener
}
func (m MyStatusListener) OnChange(node *smudge.Node, status smudge.NodeStatus) {
fmt.Printf("Node %s is now status %s\n", node.Address(), status)
}
type MyBroadcastListener struct {
smudge.BroadcastListener
}
func (m MyBroadcastListener) OnBroadcast(b *smudge.Broadcast) {
fmt.Printf("Received broadcast from %s: %s\n",
b.Origin().Address(),
string(b.Bytes()))
}
func main() {
heartbeatMillis := 500
listenPort := 9999
// Set configuration options
smudge.SetListenPort(listenPort)
smudge.SetHeartbeatMillis(heartbeatMillis)
// Add the status listener
smudge.AddStatusListener(MyStatusListener{})
// Add the broadcast listener
smudge.AddBroadcastListener(MyBroadcastListener{})
// Add a new remote node. Currently, to join an existing cluster you must
// add at least one of its healthy member nodes.
node, err := smudge.CreateNodeByAddress("localhost:10000")
if err == nil {
smudge.AddNode(node)
}
// Start the server!
smudge.Begin()
}
```

314
vendor/github.com/clockworksoul/smudge/broadcast.go generated vendored Normal file
View File

@ -0,0 +1,314 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"errors"
"fmt"
"net"
"sort"
"sync"
)
const (
// Emit counters for broadcasts can be less than 0. We transmit positive
// numbers, and decrement all the others. At this value, the broadcast
// is removed from the map all together. This ensures broadcasts are
// emitted briefly, but retained long enough to not be received twice.
broadcastRemoveValue int8 = int8(-100)
)
// The index counter value for the next broadcast message
var indexCounter uint32 = 1
// Emitted broadcasts. Once they are added here, the membership machinery will
// pick them up and piggyback them onto standard messages.
var broadcasts = struct {
sync.RWMutex
m map[string]*Broadcast
}{m: make(map[string]*Broadcast)}
// Broadcast represents a packet of bytes emitted across the cluster on top of
// the status update infrastructure. Although useful, its payload is limited
// to only 256 bytes.
type Broadcast struct {
bytes []byte
origin *Node
index uint32
label string
emitCounter int8
}
// Bytes returns a copy of this broadcast's bytes. Manipulating the contents
// of this slice will not be reflected in the contents of the broadcast.
func (b *Broadcast) Bytes() []byte {
length := len(b.bytes)
bytesCopy := make([]byte, length, length)
copy(bytesCopy, b.bytes)
return bytesCopy
}
// Index returns the origin message index for this broadcast. This value is
// incremented for each broadcast. The combination of
// originIP:originPort:Index is unique.
func (b *Broadcast) Index() uint32 {
return b.index
}
// Label returns a unique label string composed of originIP:originPort:Index.
func (b *Broadcast) Label() string {
if b.label == "" {
b.label = fmt.Sprintf("%s:%d:%d",
b.origin.ip.String(),
b.origin.port,
b.index)
}
return b.label
}
// Origin returns the node that this broadcast originated from.
func (b *Broadcast) Origin() *Node {
return b.origin
}
// BroadcastBytes allows a user to emit a short broadcast in the form of a byte
// slice, which will be transmitted at most once to all other healthy current
// members. Members that join after the broadcast has already propagated
// through the cluster will not receive the message. The maximum broadcast
// length is 256 bytes.
func BroadcastBytes(bytes []byte) error {
if len(bytes) > GetMaxBroadcastBytes() {
emsg := fmt.Sprintf(
"broadcast payload length exceeds %d bytes",
GetMaxBroadcastBytes())
return errors.New(emsg)
}
broadcasts.Lock()
bcast := Broadcast{
origin: thisHost,
index: indexCounter,
bytes: bytes,
emitCounter: int8(emitCount())}
broadcasts.m[bcast.Label()] = &bcast
indexCounter++
broadcasts.Unlock()
return nil
}
// BroadcastString allows a user to emit a short broadcast in the form of a
// string, which will be transmitted at most once to all other healthy current
// members. Members that join after the broadcast has already propagated
// through the cluster will not receive the message. The maximum broadcast
// length is 256 bytes.
func BroadcastString(str string) error {
return BroadcastBytes([]byte(str))
}
// Message contents
// Bytes Content
// ------------------------
// Bytes 00-03 Origin IP
// Bytes 04-05 Origin response port
// Bytes 06-09 Origin broadcast counter
// Bytes 10-11 Payload length (bytes)
// Bytes 12-NN Payload
func (b *Broadcast) encode() []byte {
size := 12 + len(b.bytes)
bytes := make([]byte, size, size)
// Index pointer
p := 0
// Bytes 00-03: Origin IP
ip := b.origin.IP()
for i := 0; i < 4; i++ {
bytes[p+i] = ip[i]
}
p += 4
// Bytes 04-05 Origin response port
p += encodeUint16(b.origin.Port(), bytes, p)
// Bytes 06-09 Origin broadcast counter
p += encodeUint32(b.index, bytes, p)
// Bytes 10-11 Payload length (bytes)
p += encodeUint16(uint16(len(b.bytes)), bytes, p)
// Bytes 12-NN Payload
for i, by := range b.bytes {
bytes[i+p] = by
}
if bytes[0] == 0 {
panic("Sending empty broadcast")
}
return bytes
}
// Message contents
// Bytes Content
// ------------------------
// Bytes 00-03 Origin IP
// Bytes 04-05 Origin response port
// Bytes 06-09 Origin broadcast counter
// Bytes 10-11 Payload length (bytes)
// Bytes 12-NN Payload
func decodeBroadcast(bytes []byte) (*Broadcast, error) {
var index uint32
var port uint16
var ip net.IP
var length uint16
// An index pointer
p := 0
// Bytes 00-03 Origin IP
ip = net.IPv4(
bytes[p+0],
bytes[p+1],
bytes[p+2],
bytes[p+3]).To4()
p += 4
// Bytes 04-05 Origin response port
port, p = decodeUint16(bytes, p)
// Bytes 06-09 Origin broadcast counter
index, p = decodeUint32(bytes, p)
// Bytes 10-11 Payload length (bytes)
length, p = decodeUint16(bytes, p)
// Now that we have the IP and port, we can find the Node.
origin := knownNodes.getByIP(ip.To4(), port)
// We don't know this node, so create a new one!
if origin == nil {
origin, _ = CreateNodeByIP(ip.To4(), port)
}
bcast := Broadcast{
origin: origin,
index: index,
bytes: bytes[p : p+int(length)],
emitCounter: int8(emitCount())}
if origin.IP()[0] == 0 || origin.Port() == 0 {
logWarn("Received originless broadcast")
return &bcast,
errors.New("received originless broadcast")
}
if int(length) > GetMaxBroadcastBytes() {
return &bcast,
errors.New("message length exceeds maximum length")
}
return &bcast, nil
}
// getBroadcastToEmit identifies the single known broadcast with the highest
// emitCounter value (which can be negative), and returns it. If multiple
// broadcasts have the same value, one is arbitrarily chosen.
func getBroadcastToEmit() *Broadcast {
// Get all broadcast messages.
values := make([]*Broadcast, 0, 0)
broadcasts.RLock()
for _, v := range broadcasts.m {
values = append(values, v)
}
broadcasts.RUnlock()
// Remove all overly-emitted messages from the list
broadcastSlice := make([]*Broadcast, 0, 0)
broadcasts.Lock()
for _, b := range values {
if b.emitCounter <= broadcastRemoveValue {
logDebug("Removing", b.Label(), "from recently updated list")
delete(broadcasts.m, b.Label())
} else {
broadcastSlice = append(broadcastSlice, b)
}
}
broadcasts.Unlock()
if len(broadcastSlice) > 0 {
// Put the newest nodes on top.
sort.Sort(byBroadcastEmitCounter(broadcastSlice))
return broadcastSlice[0]
}
return nil
}
// receiveBroadcast is called by receiveMessageUDP when a broadcast payload
// is found in a message.
func receiveBroadcast(broadcast *Broadcast) {
if broadcast == nil {
return
}
if broadcast.Origin().IP()[0] == 0 || broadcast.Origin().Port() == 0 {
logWarn("Received originless broadcast")
return
}
label := broadcast.Label()
broadcasts.Lock()
_, contains := broadcasts.m[label]
if !contains {
broadcasts.m[label] = broadcast
}
broadcasts.Unlock()
if !contains {
logfInfo("Broadcast [%s]=%s\n",
label,
string(broadcast.Bytes()))
doBroadcastUpdate(broadcast)
}
}
// byBroadcastEmitCounter implements sort.Interface for []*Broadcast based on
// the emitCounter field.
type byBroadcastEmitCounter []*Broadcast
func (a byBroadcastEmitCounter) Len() int {
return len(a)
}
func (a byBroadcastEmitCounter) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a byBroadcastEmitCounter) Less(i, j int) bool {
return a[i].emitCounter > a[j].emitCounter
}

19
vendor/github.com/clockworksoul/smudge/build.sh generated vendored Executable file
View File

@ -0,0 +1,19 @@
#!/bin/bash
set -eux
GO_VERSION=1.7
# Builds the binary in a go container, and drops the Linux-compatible binary in $PWD/bin
docker run --rm \
-v "$PWD":/go/src/github.com/clockworksoul/smudge \
-v "$PWD/tmp":/go/bin \
-w /go/bin \
-e "CGO_ENABLED=0" \
-e "GOOS=linux" \
golang:${GO_VERSION} \
go build -a -installsuffix cgo -v github.com/clockworksoul/smudge/smudge
docker build -t clockworksoul/smudge:latest .
sudo rm -R tmp

109
vendor/github.com/clockworksoul/smudge/bytes.go generated vendored Normal file
View File

@ -0,0 +1,109 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file contains a variety of methods to encode/decode various primitive
// data types into a byte slice using a fixed length encoding scheme. We
// chose fixed lengths (as opposed to the variable length encoding provided by
// the encoding/binary package) because it makes decoding easier. It's also
// (very slightly) more efficient.
//
// These functions are used mostly by the components defined in message.go.
package smudge
func decodeByte(bytes []byte, startIndex int) (byte, int) {
return bytes[startIndex], startIndex + 1
}
func decodeUint8(bytes []byte, startIndex int) (uint8, int) {
n, i := decodeByte(bytes, startIndex)
return byte(n), i
}
func decodeUint16(bytes []byte, startIndex int) (uint16, int) {
var number uint16
number = uint16(bytes[startIndex+1])<<8 |
uint16(bytes[startIndex+0])
return number, startIndex + 2
}
func decodeUint32(bytes []byte, startIndex int) (uint32, int) {
var number uint32
number = uint32(bytes[startIndex+3])<<24 |
uint32(bytes[startIndex+2])<<16 |
uint32(bytes[startIndex+1])<<8 |
uint32(bytes[startIndex+0])
return number, startIndex + 4
}
func decodeUint64(bytes []byte, startIndex int) (uint64, int) {
var number uint64
number = uint64(bytes[startIndex+7])<<56 |
uint64(bytes[startIndex+6])<<48 |
uint64(bytes[startIndex+5])<<40 |
uint64(bytes[startIndex+4])<<32 |
uint64(bytes[startIndex+3])<<24 |
uint64(bytes[startIndex+2])<<16 |
uint64(bytes[startIndex+1])<<8 |
uint64(bytes[startIndex+0])
return number, startIndex + 8
}
func encodeByte(number byte, bytes []byte, startIndex int) int {
bytes[startIndex+0] = number
return 1
}
func encodeUint8(number uint8, bytes []byte, startIndex int) int {
return encodeByte(byte(number), bytes, startIndex)
}
func encodeUint16(number uint16, bytes []byte, startIndex int) int {
bytes[startIndex+0] = byte(number)
bytes[startIndex+1] = byte(number >> 8)
return 2
}
func encodeUint32(number uint32, bytes []byte, startIndex int) int {
bytes[startIndex+0] = byte(number)
bytes[startIndex+1] = byte(number >> 8)
bytes[startIndex+2] = byte(number >> 16)
bytes[startIndex+3] = byte(number >> 24)
return 4
}
func encodeUint64(number uint64, bytes []byte, startIndex int) int {
bytes[startIndex+0] = byte(number)
bytes[startIndex+1] = byte(number >> 8)
bytes[startIndex+2] = byte(number >> 16)
bytes[startIndex+3] = byte(number >> 24)
bytes[startIndex+4] = byte(number >> 32)
bytes[startIndex+5] = byte(number >> 40)
bytes[startIndex+6] = byte(number >> 48)
bytes[startIndex+7] = byte(number >> 56)
return 8
}

225
vendor/github.com/clockworksoul/smudge/bytes_test.go generated vendored Normal file
View File

@ -0,0 +1,225 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"testing"
)
func TestEncodeDecodeUint16A(t *testing.T) {
bytes := make([]byte, 2, 2)
initial := uint16(0x0000)
encodeUint16(initial, bytes, 0)
backAgain, p := decodeUint16(bytes, 0)
if p != 2 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint16B(t *testing.T) {
bytes := make([]byte, 2, 2)
initial := uint16(0x048C)
encodeUint16(initial, bytes, 0)
backAgain, p := decodeUint16(bytes, 0)
if p != 2 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint16C(t *testing.T) {
bytes := make([]byte, 2, 2)
initial := uint16(0x159D)
encodeUint16(initial, bytes, 0)
backAgain, p := decodeUint16(bytes, 0)
if p != 2 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint16D(t *testing.T) {
bytes := make([]byte, 2, 2)
initial := uint16(0xFFFF)
encodeUint16(initial, bytes, 0)
backAgain, p := decodeUint16(bytes, 0)
if p != 2 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint32A(t *testing.T) {
bytes := make([]byte, 4, 4)
initial := uint32(0x00000000)
encodeUint32(initial, bytes, 0)
backAgain, p := decodeUint32(bytes, 0)
if p != 4 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint32B(t *testing.T) {
bytes := make([]byte, 4, 4)
initial := uint32(0x02468ACE)
encodeUint32(initial, bytes, 0)
backAgain, p := decodeUint32(bytes, 0)
if p != 4 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint32C(t *testing.T) {
bytes := make([]byte, 4, 4)
initial := uint32(0x13579BDF)
encodeUint32(initial, bytes, 0)
backAgain, p := decodeUint32(bytes, 0)
if p != 4 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint32D(t *testing.T) {
bytes := make([]byte, 4, 4)
initial := uint32(0xFFFFFFFF)
encodeUint32(initial, bytes, 0)
backAgain, p := decodeUint32(bytes, 0)
if p != 4 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint64A(t *testing.T) {
bytes := make([]byte, 8, 8)
initial := uint64(0x000000000000000)
encodeUint64(initial, bytes, 0)
backAgain, p := decodeUint64(bytes, 0)
if p != 8 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint64B(t *testing.T) {
bytes := make([]byte, 8, 8)
initial := uint64(0x02468ACE02468ACE)
encodeUint64(initial, bytes, 0)
backAgain, p := decodeUint64(bytes, 0)
if p != 8 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint64C(t *testing.T) {
bytes := make([]byte, 8, 8)
initial := uint64(0x13579BDF13579BDF)
encodeUint64(initial, bytes, 0)
backAgain, p := decodeUint64(bytes, 0)
if p != 8 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}
func TestEncodeDecodeUint64D(t *testing.T) {
bytes := make([]byte, 8, 8)
initial := uint64(0xFFFFFFFFFFFFFFFF)
encodeUint64(initial, bytes, 0)
backAgain, p := decodeUint64(bytes, 0)
if p != 8 {
t.Fail()
}
if initial != backAgain {
t.Errorf("%d != %d", initial, backAgain)
}
}

View File

@ -0,0 +1,253 @@
version: '2'
services:
fish001:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish002:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish003:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish004:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish005:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish006:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish007:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish008:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish009:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish010:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish021,fish022,fish023,fish024,fish025,fish026,fish027,fish028,fish029,fish030
networks:
- smudgeNetwork
fish011:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish012:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish013:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish014:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish015:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish016:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish017:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish018:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish019:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish020:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish001,fish002,fish003,fish004,fish005,fish006,fish007,fish008,fish009,fish010
networks:
- smudgeNetwork
fish020:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish021:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish022:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish023:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish024:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish025:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish026:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish027:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish028:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish029:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
fish030:
image: clockworksoul/smudge:latest
command: "/smudge"
environment:
SMUDGE_INITIAL_HOSTS: fish011,fish012,fish013,fish014,fish015,fish016,fish017,fish018,fish019,fish020
networks:
- smudgeNetwork
networks:
smudgeNetwork:

81
vendor/github.com/clockworksoul/smudge/events.go generated vendored Normal file
View File

@ -0,0 +1,81 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import "sync"
var broadcastListeners = struct {
sync.RWMutex
s []BroadcastListener
}{s: make([]BroadcastListener, 0, 16)}
var statusListeners = struct {
sync.RWMutex
s []StatusListener
}{s: make([]StatusListener, 0, 16)}
// BroadcastListener is the interface that must be implemented to take advantage
// of the cluster member status update notification functionality provided by
// the AddBroadcastListener() function.
type BroadcastListener interface {
// The OnChange() function is called whenever the node is notified of any
// change in the status of a cluster member.
OnBroadcast(broadcast *Broadcast)
}
// AddBroadcastListener allows the submission of a BroadcastListener implementation
// whose OnChange() function will be called whenever the node is notified of any
// change in the status of a cluster member.
func AddBroadcastListener(listener BroadcastListener) {
broadcastListeners.Lock()
broadcastListeners.s = append(broadcastListeners.s, listener)
broadcastListeners.Unlock()
}
func doBroadcastUpdate(broadcast *Broadcast) {
broadcastListeners.RLock()
for _, sl := range broadcastListeners.s {
sl.OnBroadcast(broadcast)
}
broadcastListeners.RUnlock()
}
// StatusListener is the interface that must be implemented to take advantage
// of the cluster member status update notification functionality provided by
// the AddStatusListener() function.
type StatusListener interface {
// The OnChange() function is called whenever the node is notified of any
// change in the status of a cluster member.
OnChange(node *Node, status NodeStatus)
}
// AddStatusListener allows the submission of a StatusListener implementation
// whose OnChange() function will be called whenever the node is notified of any
// change in the status of a cluster member.
func AddStatusListener(listener StatusListener) {
statusListeners.Lock()
statusListeners.s = append(statusListeners.s, listener)
statusListeners.Unlock()
}
func doStatusUpdate(node *Node, status NodeStatus) {
statusListeners.RLock()
for _, sl := range statusListeners.s {
sl.OnChange(node, status)
}
statusListeners.RUnlock()
}

156
vendor/github.com/clockworksoul/smudge/log.go generated vendored Normal file
View File

@ -0,0 +1,156 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"fmt"
"os"
"time"
)
// LogLevel represents a logging levels to be used as a parameter passed to
// the SetLogThreshhold() function.
type LogLevel byte
const (
// LogAll allows all log output of all levels to be emitted.
LogAll LogLevel = iota
// LogTrace restricts log output to trace level and above.
LogTrace
// LogDebug restricts log output to debug level and above.
LogDebug
// LogInfo restricts log output to info level and above.
LogInfo
// LogWarn restricts log output to warn level and above.
LogWarn
// LogError restricts log output to error level and above.
LogError
// LogFatal restricts log output to fatal level.
LogFatal
// LogOff prevents all log output entirely.
LogOff
)
var logThreshhold = LogInfo
func (s LogLevel) String() string {
switch s {
case LogAll:
return "All"
case LogTrace:
return "Trace"
case LogDebug:
return "Debug"
case LogInfo:
return "Info"
case LogWarn:
return "Warn"
case LogError:
return "Error"
case LogFatal:
return "Fatal"
case LogOff:
return "Off"
default:
return "Unknown"
}
}
// SetLogThreshold allows the output noise level to be adjusted by setting
// the logging priority threshold.
func SetLogThreshold(level LogLevel) {
logThreshhold = level
}
func prefix(level LogLevel) string {
f := time.Now().Format("02/Jan/2006:15:04:05 MST")
return fmt.Sprintf("%5s %s -", level.String(), f)
}
func log(level LogLevel, a ...interface{}) (n int, err error) {
if level >= logThreshhold {
fmt.Fprint(os.Stdout, prefix(level)+" ")
return fmt.Fprintln(os.Stdout, a...)
}
return 0, nil
}
func logTrace(a ...interface{}) (n int, err error) {
return log(LogTrace, a...)
}
func logDebug(a ...interface{}) (n int, err error) {
return log(LogDebug, a...)
}
func logInfo(a ...interface{}) (n int, err error) {
return log(LogInfo, a...)
}
func logWarn(a ...interface{}) (n int, err error) {
return log(LogWarn, a...)
}
func logError(a ...interface{}) (n int, err error) {
return log(LogError, a...)
}
func logFatal(a ...interface{}) (n int, err error) {
return log(LogFatal, a...)
}
func logf(level LogLevel, format string, a ...interface{}) (n int, err error) {
if level >= logThreshhold {
return fmt.Fprintf(os.Stdout, prefix(level)+" "+format, a...)
}
return 0, nil
}
func logfTrace(format string, a ...interface{}) (n int, err error) {
return logf(LogTrace, format, a...)
}
func logfDebug(format string, a ...interface{}) (n int, err error) {
return logf(LogDebug, format, a...)
}
func logfInfo(format string, a ...interface{}) (n int, err error) {
return logf(LogInfo, format, a...)
}
func logfWarn(format string, a ...interface{}) (n int, err error) {
return logf(LogWarn, format, a...)
}
func logfError(format string, a ...interface{}) (n int, err error) {
return logf(LogError, format, a...)
}
func logfFatal(format string, a ...interface{}) (n int, err error) {
return logf(LogFatal, format, a...)
}

BIN
vendor/github.com/clockworksoul/smudge/logo/logo.png generated vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

620
vendor/github.com/clockworksoul/smudge/membership.go generated vendored Normal file
View File

@ -0,0 +1,620 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"math"
"net"
"strconv"
"sync"
"time"
)
// A scalar value used to calculate a variety of limits
const lambda = 2.5
// How many standard deviations beyond the mean PING/ACK response time we
// allow before timing out an ACK.
const timeoutToleranceSigmas = 3.0
var currentHeartbeat uint32
var pendingAcks = struct {
sync.RWMutex
m map[string]*pendingAck
}{m: make(map[string]*pendingAck)}
var thisHostAddress string
var thisHost *Node
// This flag is set whenever a known node is added or removed.
var knownNodesModifiedFlag = false
var pingdata = newPingData(150, 50)
/******************************************************************************
* Exported functions (for public consumption)
*****************************************************************************/
// Begin starts the server by opening a UDP port and beginning the heartbeat.
// Note that this is a blocking function, so act appropriately.
func Begin() {
// Add this host.
ip, err := GetLocalIP()
if err != nil {
logFatal("Could not get local ip:", err)
return
}
if ip == nil {
logWarn("Warning: Could not resolve host IP. Using 127.0.0.1")
ip = []byte{127, 0, 0, 1}
}
me := Node{
ip: ip,
port: uint16(GetListenPort()),
timestamp: GetNowInMillis(),
pingMillis: PingNoData,
}
thisHostAddress = me.Address()
thisHost = &me
logInfo("My host address:", thisHostAddress)
go listenUDP(GetListenPort())
// Add this node's status. Don't update any other node's statuses: they'll
// report those back to us.
updateNodeStatus(thisHost, StatusAlive, 0)
AddNode(thisHost)
// Add initial hosts as specified by the SMUDGE_INITIAL_HOSTS property
for _, address := range GetInitialHosts() {
n, err := CreateNodeByAddress(address)
if err != nil {
logfError("Could not create node %s: %v\n", address, err)
} else {
AddNode(n)
}
}
go startTimeoutCheckLoop()
// Loop over a randomized list of all known nodes (except for this host
// node), pinging one at a time. If the knownNodesModifiedFlag is set to
// true by AddNode() or RemoveNode(), the we get a fresh list and start
// again.
for {
var randomAllNodes = knownNodes.getRandomNodes(0, thisHost)
var pingCounter int
for _, node := range randomAllNodes {
// Exponential backoff of dead nodes, until such time as they are removed.
if node.status == StatusDead {
var dnc *deadNodeCounter
var ok bool
deadNodeRetries.Lock()
if dnc, ok = deadNodeRetries.m[node.Address()]; !ok {
dnc = &deadNodeCounter{retry: 1, retryCountdown: 2}
deadNodeRetries.m[node.Address()] = dnc
}
deadNodeRetries.Unlock()
dnc.retryCountdown--
if dnc.retryCountdown <= 0 {
dnc.retry++
dnc.retryCountdown = int(math.Pow(2.0, float64(dnc.retry)))
if dnc.retry > maxDeadNodeRetries {
logDebug("Forgetting dead node", node.Address())
deadNodeRetries.Lock()
delete(deadNodeRetries.m, node.Address())
deadNodeRetries.Unlock()
RemoveNode(node)
continue
}
} else {
continue
}
}
currentHeartbeat++
logfDebug("%d - hosts=%d (announce=%d forward=%d)\n",
currentHeartbeat,
len(randomAllNodes),
emitCount(),
pingRequestCount())
PingNode(node)
pingCounter++
time.Sleep(time.Millisecond * time.Duration(GetHeartbeatMillis()))
if knownNodesModifiedFlag {
knownNodesModifiedFlag = false
break
}
}
if pingCounter == 0 {
logDebug("No nodes to ping. So lonely. :(")
time.Sleep(time.Millisecond * time.Duration(GetHeartbeatMillis()))
}
}
}
// PingNode can be used to explicitly ping a node. Calls the low-level
// doPingNode(), and outputs a message (and returns an error) if it fails.
func PingNode(node *Node) error {
err := transmitVerbPingUDP(node, currentHeartbeat)
if err != nil {
logInfo("Failure to ping", node, "->", err)
}
return err
}
/******************************************************************************
* Private functions (for internal use only)
*****************************************************************************/
// The number of times any node's new status should be emitted after changes.
// Currently set to (lambda * log(node count)).
func emitCount() int {
logn := math.Log(float64(knownNodes.length()))
mult := (lambda * logn) + 0.5
return int(mult)
}
func doForwardOnTimeout(pack *pendingAck) {
filteredNodes := getTargetNodes(pingRequestCount(), thisHost, pack.node)
if len(filteredNodes) == 0 {
logDebug(thisHost.Address(), "Cannot forward ping request: no more nodes")
updateNodeStatus(pack.node, StatusDead, currentHeartbeat)
} else {
for i, n := range filteredNodes {
logfDebug("(%d/%d) Requesting indirect ping of %s via %s\n",
i+1,
len(filteredNodes),
pack.node.Address(),
n.Address())
transmitVerbForwardUDP(n, pack.node, currentHeartbeat)
}
}
}
// Returns a random slice of valid ping/forward request targets; i.e., not
// this node, and not dead.
func getTargetNodes(count int, exclude ...*Node) []*Node {
randomNodes := knownNodes.getRandomNodes(0, exclude...)
filteredNodes := make([]*Node, 0, count)
for _, n := range randomNodes {
if len(filteredNodes) >= count {
break
}
if n.status == StatusDead {
continue
}
filteredNodes = append(filteredNodes, n)
}
return filteredNodes
}
func listenUDP(port int) error {
listenAddress, err := net.ResolveUDPAddr("udp", ":"+strconv.FormatInt(int64(port), 10))
if err != nil {
return err
}
/* Now listen at selected port */
c, err := net.ListenUDP("udp", listenAddress)
if err != nil {
return err
}
defer c.Close()
for {
buf := make([]byte, 512)
n, addr, err := c.ReadFromUDP(buf)
if err != nil {
logError("UDP read error: ", err)
}
go func(addr *net.UDPAddr, msg []byte) {
err = receiveMessageUDP(addr, buf[0:n])
if err != nil {
logError(err)
}
}(addr, buf[0:n])
}
}
// The number of nodes to send a PINGREQ to when a PING times out.
// Currently set to (lambda * log(node count)).
func pingRequestCount() int {
logn := math.Log(float64(knownNodes.length()))
mult := (lambda * logn) + 0.5
return int(mult)
}
func receiveMessageUDP(addr *net.UDPAddr, msgBytes []byte) error {
msg, err := decodeMessage(addr.IP, msgBytes)
if err != nil {
return err
}
logfTrace("Got %v from %v code=%d\n",
msg.verb,
msg.sender.Address(),
msg.senderHeartbeat)
// Synchronize heartbeats
if msg.senderHeartbeat > 0 && msg.senderHeartbeat-1 > currentHeartbeat {
logfTrace("Heartbeat advanced from %d to %d\n",
currentHeartbeat,
msg.senderHeartbeat-1)
currentHeartbeat = msg.senderHeartbeat - 1
}
updateStatusesFromMessage(msg)
receiveBroadcast(msg.broadcast)
// Handle the verb.
switch msg.verb {
case verbPing:
err = receiveVerbPingUDP(msg)
case verbAck:
err = receiveVerbAckUDP(msg)
case verbPingRequest:
err = receiveVerbForwardUDP(msg)
case verbNonForwardingPing:
err = receiveVerbNonForwardPingUDP(msg)
}
if err != nil {
return err
}
return nil
}
func receiveVerbAckUDP(msg message) error {
key := msg.sender.Address() + ":" + strconv.FormatInt(int64(msg.senderHeartbeat), 10)
pendingAcks.RLock()
_, ok := pendingAcks.m[key]
pendingAcks.RUnlock()
if ok {
msg.sender.Touch()
pendingAcks.Lock()
if pack, ok := pendingAcks.m[key]; ok {
// If this is a response to a requested ping, respond to the
// callback node
if pack.callback != nil {
go transmitVerbAckUDP(pack.callback, pack.callbackCode)
} else {
// Note the ping response time.
notePingResponseTime(pack)
}
}
delete(pendingAcks.m, key)
pendingAcks.Unlock()
}
return nil
}
func notePingResponseTime(pack *pendingAck) {
// Note the elapsed time
elapsedMillis := pack.elapsed()
pack.node.pingMillis = int(elapsedMillis)
// For the purposes of timeout tolerance, we treat all pings less than
// 10 as 10.
if elapsedMillis < 10 {
elapsedMillis = 10
}
pingdata.add(elapsedMillis)
mean, stddev := pingdata.data()
sigmas := pingdata.nSigma(timeoutToleranceSigmas)
logfTrace("Got ACK in %dms (mean=%.02f stddev=%.02f sigmas=%.02f)\n",
elapsedMillis,
mean,
stddev,
sigmas)
}
func receiveVerbForwardUDP(msg message) error {
// We don't forward to a node that we don't know.
if len(msg.members) >= 0 &&
msg.members[0].status == StatusForwardTo {
member := msg.members[0]
node := member.node
code := member.heartbeat
key := node.Address() + ":" + strconv.FormatInt(int64(code), 10)
pack := pendingAck{
node: node,
startTime: GetNowInMillis(),
callback: msg.sender,
callbackCode: code,
packType: packNFP}
pendingAcks.Lock()
pendingAcks.m[key] = &pack
pendingAcks.Unlock()
return transmitVerbGenericUDP(node, nil, verbNonForwardingPing, code)
}
return nil
}
func receiveVerbPingUDP(msg message) error {
return transmitVerbAckUDP(msg.sender, msg.senderHeartbeat)
}
func receiveVerbNonForwardPingUDP(msg message) error {
return transmitVerbAckUDP(msg.sender, msg.senderHeartbeat)
}
func startTimeoutCheckLoop() {
for {
pendingAcks.Lock()
for k, pack := range pendingAcks.m {
elapsed := pack.elapsed()
timeoutMillis := uint32(pingdata.nSigma(timeoutToleranceSigmas))
// Ping requests are expected to take quite a bit longer.
// Just call it 2x for now.
if pack.packType == packPingReq {
timeoutMillis *= 2
}
// This pending ACK has taken longer than expected. Mark it as
// timed out.
if elapsed > timeoutMillis {
switch pack.packType {
case packPing:
go doForwardOnTimeout(pack)
case packPingReq:
logDebug(k, "timed out after", timeoutMillis, "milliseconds (dropped PINGREQ)")
if knownNodes.contains(pack.callback) {
updateNodeStatus(pack.callback, StatusDead, currentHeartbeat)
pack.callback.pingMillis = PingTimedOut
}
case packNFP:
logDebug(k, "timed out after", timeoutMillis, "milliseconds (dropped NFP)")
if knownNodes.contains(pack.node) {
updateNodeStatus(pack.node, StatusDead, currentHeartbeat)
pack.callback.pingMillis = PingTimedOut
}
}
delete(pendingAcks.m, k)
}
}
pendingAcks.Unlock()
time.Sleep(time.Millisecond * 100)
}
}
func transmitVerbGenericUDP(node *Node, forwardTo *Node, verb messageVerb, code uint32) error {
// Transmit the ACK
remoteAddr, err := net.ResolveUDPAddr("udp", node.Address())
if err != nil {
return err
}
c, err := net.DialUDP("udp", nil, remoteAddr)
if err != nil {
return err
}
defer c.Close()
msg := newMessage(verb, thisHost, code)
if forwardTo != nil {
msg.addMember(forwardTo, StatusForwardTo, code)
}
// Add members for update.
nodes := getRandomUpdatedNodes(pingRequestCount(), node, thisHost)
// No updates to distribute? Send out a few updates on other known nodes.
if len(nodes) == 0 {
nodes = knownNodes.getRandomNodes(pingRequestCount(), node, thisHost)
}
for _, n := range nodes {
err = msg.addMember(n, n.status, n.heartbeat)
if err != nil {
return err
}
n.emitCounter--
}
// Emit counters for broadcasts can be less than 0. We transmit positive
// numbers, and decrement all the others. At some value < 0, the broadcast
// is removed from the map all together.
broadcast := getBroadcastToEmit()
if broadcast != nil {
if broadcast.emitCounter > 0 {
msg.addBroadcast(broadcast)
}
broadcast.emitCounter--
}
_, err = c.Write(msg.encode())
if err != nil {
return err
}
// Decrement the update counters on those nodes
for _, m := range msg.members {
m.node.emitCounter--
}
logfTrace("Sent %v to %v\n", verb, node.Address())
return nil
}
func transmitVerbForwardUDP(node *Node, downstream *Node, code uint32) error {
key := node.Address() + ":" + strconv.FormatInt(int64(code), 10)
pack := pendingAck{
node: node,
startTime: GetNowInMillis(),
callback: downstream,
packType: packPingReq}
pendingAcks.Lock()
pendingAcks.m[key] = &pack
pendingAcks.Unlock()
return transmitVerbGenericUDP(node, downstream, verbPingRequest, code)
}
func transmitVerbAckUDP(node *Node, code uint32) error {
return transmitVerbGenericUDP(node, nil, verbAck, code)
}
func transmitVerbPingUDP(node *Node, code uint32) error {
key := node.Address() + ":" + strconv.FormatInt(int64(code), 10)
pack := pendingAck{
node: node,
startTime: GetNowInMillis(),
packType: packPing}
pendingAcks.Lock()
pendingAcks.m[key] = &pack
pendingAcks.Unlock()
return transmitVerbGenericUDP(node, nil, verbPing, code)
}
func updateStatusesFromMessage(msg message) {
for _, m := range msg.members {
// If the heartbeat in the message is less then the heartbeat
// associated with the last known status, then we conclude that the
// message is old and we drop it.
if m.heartbeat < m.node.heartbeat {
logfDebug("Message is old (%d vs %d): dropping\n",
m.node.heartbeat, m.heartbeat)
continue
}
switch m.status {
case StatusForwardTo:
// The FORWARD_TO status isn't useful here, so we ignore those
continue
case StatusDead:
// Don't tell ME I'm dead.
if m.node.Address() != thisHost.Address() {
updateNodeStatus(m.node, m.status, m.heartbeat)
AddNode(m.node)
}
default:
updateNodeStatus(m.node, m.status, m.heartbeat)
AddNode(m.node)
}
}
// Obviously, we know the sender is alive. Report it as such.
if msg.senderHeartbeat > msg.sender.heartbeat {
updateNodeStatus(msg.sender, StatusAlive, msg.senderHeartbeat)
}
// First, if we don't know the sender, we add it.
if !knownNodes.contains(msg.sender) {
AddNode(msg.sender)
}
}
// pendingAckType represents an expectation of a response to a previously
// emitted PING, PINGREQ, or NFP.
type pendingAck struct {
startTime uint32
node *Node
callback *Node
callbackCode uint32
packType pendingAckType
}
func (a *pendingAck) elapsed() uint32 {
return GetNowInMillis() - a.startTime
}
// pendingAckType represents the type of PING that a pendingAckType is waiting
// for a response for: PING, PINGREQ, or NFP.
type pendingAckType byte
const (
packPing pendingAckType = iota
packPingReq
packNFP
)
func (p pendingAckType) String() string {
switch p {
case packPing:
return "PING"
case packPingReq:
return "PINGREQ"
case packNFP:
return "NFP"
default:
return "UNDEFINED"
}
}

295
vendor/github.com/clockworksoul/smudge/message.go generated vendored Normal file
View File

@ -0,0 +1,295 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"errors"
"hash/adler32"
"net"
)
// Message contents
// ---[ Base message (11 bytes)]---
// Bytes 00-03 Checksum (32-bit)
// Bytes 04 Verb (one of {PING|ACK|PINGREQ|NFPING})
// Bytes 05-06 Sender response port
// Bytes 07-10 Sender current heartbeat
// ---[ Per member (11 bytes)]---
// Bytes 00 Member status byte
// Bytes 01-04 Member host IP
// Bytes 05-06 Member host response port
// Bytes 07-10 Sender current heartbeat
// ---[ Per broadcast (1 allowed) (11+N bytes) ]
// Bytes 00-03 Origin IP
// Bytes 04-05 Origin response port
// Bytes 06-09 Origin broadcast counter
// Bytes 10-11 Payload length (bytes)
// Bytes 12-NN Payload
type message struct {
sender *Node
senderHeartbeat uint32
verb messageVerb
members []*messageMember
broadcast *Broadcast
}
// Represents a "member" of a message; i.e., a node that the sender knows
// about, about which it wishes to notify the downstream recipient.
type messageMember struct {
heartbeat uint32
node *Node
status NodeStatus
}
// Convenience function. Creates a new message instance.
func newMessage(verb messageVerb, sender *Node, senderHeartbeat uint32) message {
return message{
sender: sender,
senderHeartbeat: senderHeartbeat,
verb: verb,
}
}
// Adds a broadcast to this message. Only one broadcast is allowed; subsequent
// calls will replace an existing broadcast.
func (m *message) addBroadcast(broadcast *Broadcast) {
m.broadcast = broadcast
}
// Adds a member status update to this message. The maximum number of allowed
// members is 2^6 - 1 = 63, though it is incredibly unlikely that this maximum
// will be reached without an absurdly high lambda. There aren't yet many
// 88 billion node clusters (assuming lambda of 2.5).
func (m *message) addMember(n *Node, status NodeStatus, heartbeat uint32) error {
if m.members == nil {
m.members = make([]*messageMember, 0, 32)
} else if len(m.members) >= 63 {
return errors.New("member list overflow")
}
messageMember := messageMember{
heartbeat: heartbeat,
node: n,
status: status}
m.members = append(m.members, &messageMember)
return nil
}
// Message contents
// ---[ Base message (12 bytes)]---
// Bytes 00-03 Checksum (32-bit)
// Bytes 04 Verb (one of {PING|ACK|PINGREQ|NFPING})
// Bytes 05-06 Sender response port
// Bytes 07-10 Sender ID Code
// ---[ Per member (11 bytes)]---
// Bytes 00 Member status byte
// Bytes 01-04 Member host IP
// Bytes 05-06 Member host response port
// Bytes 07-10 Member heartbeat
func (m *message) encode() []byte {
size := 11 + (len(m.members) * 11)
if m.broadcast != nil {
size += 12 + len(m.broadcast.bytes)
}
bytes := make([]byte, size, size)
// An index pointer (start at 4 to accommodate checksum)
p := 4
// Byte 00
// Rightmost 2 bits: verb (one of {P|A|F|N})
// Leftmost 6 bits: number of members in payload
verbByte := byte(len(m.members))
verbByte = (verbByte << 2) | byte(m.verb)
p += encodeByte(verbByte, bytes, p)
// Bytes 01-02 Sender response port
p += encodeUint16(m.sender.port, bytes, p)
// Bytes 03-06 ID Code
p += encodeUint32(m.senderHeartbeat, bytes, p)
// Each member data requires 11 bytes.
for _, member := range m.members {
mnode := member.node
mstatus := member.status
mcode := member.heartbeat
// Byte p + 00
bytes[p] = byte(mstatus)
p++
// Bytes (p + 01) to (p + 04): Originating host IP
ipb := mnode.ip
for i := 0; i < 4; i++ {
bytes[p+i] = ipb[i]
}
p += 4
// Bytes (p + 05) to (p + 06): Originating host response port
p += encodeUint16(mnode.port, bytes, p)
// Bytes (p + 07) to (p + 10): Originating message code
p += encodeUint32(mcode, bytes, p)
}
if m.broadcast != nil {
bbytes := m.broadcast.encode()
for i, v := range bbytes {
bytes[p+i] = v
}
}
checksum := adler32.Checksum(bytes[4:])
encodeUint32(checksum, bytes, 0)
return bytes
}
// If members exist on this message, and that message has the "forward to"
// status, this function returns it; otherwise it returns nil.
func (m *message) getForwardTo() *messageMember {
if len(m.members) > 0 && m.members[0].status == StatusForwardTo {
return m.members[0]
}
return nil
}
// Parses the bytes received in a UDP message.
// If the address:port from the message can't be associated with a known
// (live) node, then an instance of message.sender will be created from
// available data but not explicitly added to the known nodes.
func decodeMessage(sourceIP net.IP, bytes []byte) (message, error) {
var err error
// An index pointer
p := 0
// Bytes 00-03 Checksum (32-bit)
checksumStated, p := decodeUint32(bytes, p)
checksumCalculated := adler32.Checksum(bytes[4:])
if checksumCalculated != checksumStated {
return newMessage(255, nil, 0),
errors.New("checksum failure from " + sourceIP.String())
}
// Byte 04
// Rightmost 2 bits: verb (one of {P|A|F|N})
// Leftmost 6 bits: number of members in payload
v, p := decodeByte(bytes, p)
verb := messageVerb(v & 0x03)
memberCount := int(v >> 2)
// Bytes 05-06 Sender response port
senderPort, p := decodeUint16(bytes, p)
// Bytes 07-10 Sender ID Code
senderHeartbeat, p := decodeUint32(bytes, p)
// Now that we have the IP and port, we can find the Node.
sender := knownNodes.getByIP(sourceIP.To4(), senderPort)
// We don't know this node, so create a new one!
if sender == nil {
sender, _ = CreateNodeByIP(sourceIP.To4(), senderPort)
}
// Now that we have the verb, node, and code, we can build the mesage
m := newMessage(verb, sender, senderHeartbeat)
memberLastIndex := p + (memberCount * 11)
if len(bytes) > p {
m.members = decodeMembers(memberCount, bytes[p:memberLastIndex])
}
if len(bytes) > memberLastIndex {
m.broadcast, err = decodeBroadcast(bytes[memberLastIndex:])
if m.broadcast.origin.IP()[0] == 0 || m.broadcast.origin.Port() == 0 {
err = errors.New("Received originless broadcast!")
}
}
return m, err
}
func decodeMembers(memberCount int, bytes []byte) []*messageMember {
// Bytes 00 Member status byte
// Bytes 01-04 Member host IP
// Bytes 05-06 Member host response port
// Bytes 07-10 Member heartbeat
members := make([]*messageMember, 0, 1)
// An index pointer
p := 0
for p < len(bytes) {
var mstatus NodeStatus
var mip net.IP
var mport uint16
var mcode uint32
var mnode *Node
// Byte 00 Member status byte
mstatus = NodeStatus(bytes[p])
p++
// Bytes 01-04 member IP
if bytes[p] > 0 {
mip = net.IPv4(
bytes[p+0],
bytes[p+1],
bytes[p+2],
bytes[p+3]).To4()
}
p += 4
// Bytes 05-06 member response port
mport, p = decodeUint16(bytes, p)
// Bytes 07-10 member heartbeat
mcode, p = decodeUint32(bytes, p)
if len(mip) > 0 {
// Find the sender by the address associated with the message
mnode = knownNodes.getByIP(mip, mport)
// We still don't know this node, so create a new one!
if mnode == nil {
mnode, _ = CreateNodeByIP(mip, mport)
}
}
member := messageMember{
heartbeat: mcode,
node: mnode,
status: mstatus,
}
members = append(members, &member)
}
return members
}

53
vendor/github.com/clockworksoul/smudge/messageVerb.go generated vendored Normal file
View File

@ -0,0 +1,53 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
type messageVerb byte
const (
// VerbPing represents a simple ping. If this ping is not responded to with
// an ack within a timeout period, the pinging host will attempt to ping
// indirectly via one or more additional hosts with a ping request.
verbPing messageVerb = iota
// VerbAck represents a response to a ping request.
verbAck
// VerbPingRequest represents a request made by one host to another to ping
// a third host whose live status is in question.
verbPingRequest
// VerbNonForwardingPing represents a ping in response to a ping request.
// If the ping times out, the host does not follow up with a ping request
// to any other hosts.
verbNonForwardingPing
)
func (v messageVerb) String() string {
switch v {
case verbPing:
return "PING"
case verbAck:
return "ACK"
case verbPingRequest:
return "PINGREQ"
case verbNonForwardingPing:
return "NFPING"
default:
return "UNDEFINED"
}
}

225
vendor/github.com/clockworksoul/smudge/message_test.go generated vendored Normal file
View File

@ -0,0 +1,225 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"net"
"reflect"
"testing"
)
// Identical but distinct instance from node1b
var node1a = Node{
ip: net.IP([]byte{127, 0, 0, 1}),
port: 1234,
timestamp: 87878787,
status: StatusAlive,
emitCounter: 42,
pingMillis: PingNoData}
// Identical but distinct instance from node1a
var node1b = Node{
ip: net.IP([]byte{127, 0, 0, 1}),
port: 1234,
timestamp: 87878787,
status: StatusAlive,
emitCounter: 42,
pingMillis: PingNoData}
// Different from node1a and node1b
var node2 = Node{
ip: net.IP([]byte{127, 0, 0, 1}),
port: 10001,
timestamp: GetNowInMillis(),
status: StatusAlive,
emitCounter: 42,
pingMillis: PingNoData}
var message1a = message{
sender: &node1a,
senderHeartbeat: 255,
verb: verbPing}
var message1b = message{
sender: &node1b,
senderHeartbeat: 255,
verb: verbPing}
var message2 = message{
sender: &node2,
senderHeartbeat: 23,
verb: verbAck}
// Does deep equality of two different but identical messages return true?
func TestDeepEqualityTrue(t *testing.T) {
if !reflect.DeepEqual(message1a, message1b) {
t.Fail()
}
}
// Does deep equality of two different messages return false?
func TestDeepEqualityFalse(t *testing.T) {
if reflect.DeepEqual(message1a, message2) {
t.Fail()
}
}
// Endode and decode a simple message without any members, and see if
// the input/output match.
func TestEncodeDecodeBasic(t *testing.T) {
timestamp := uint32(87878787)
sender := Node{
ip: net.IP([]byte{127, 0, 0, 1}),
port: 1234,
timestamp: timestamp,
pingMillis: PingNoData,
}
message := message{
sender: &sender,
senderHeartbeat: 255,
verb: verbPing}
ip := net.IP([]byte{127, 0, 0, 1})
bytes := message.encode()
decoded, err := decodeMessage(ip, bytes)
decoded.sender.timestamp = timestamp
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(message, decoded) {
t.Error("Messages do not match:")
t.Log(" Input:", message)
t.Log("Output:", decoded)
t.Log(" Input node:", message.sender)
t.Log("Output node:", decoded.sender)
}
}
// Endode and decode a simple message without one member, and see if
// the input/output match.
func TestEncodeDecode1Member(t *testing.T) {
timestamp := uint32(87878787)
sender := Node{
ip: net.IP([]byte{127, 0, 0, 1}),
port: 1234,
timestamp: timestamp,
pingMillis: PingNoData,
}
member := Node{
ip: net.IP([]byte{127, 0, 0, 2}),
port: 9000,
timestamp: timestamp,
pingMillis: PingNoData,
}
message := message{
sender: &sender,
senderHeartbeat: 255,
verb: verbPing}
message.addMember(&member, StatusDead, 38)
if len(message.members) != 1 {
t.Error("No member in the input members list!")
}
ip := net.IP([]byte{127, 0, 0, 1})
bytes := message.encode()
decoded, err := decodeMessage(ip, bytes)
decoded.sender.timestamp = timestamp
decoded.members[0].node.timestamp = timestamp
if err != nil {
t.Error(err)
}
if len(decoded.members) != 1 {
t.Error("No member in the output members list!")
}
if !reflect.DeepEqual(message, decoded) {
t.Error("Messages do not match")
t.Log(" Input:", message.members[0])
t.Log("Output:", decoded.members[0])
t.Log(" Input node:", message.members[0].node)
t.Log("Output node:", decoded.members[0].node)
}
}
// Endode and decode a simple message without one member, and see if
// the input/output match.
func TestEncodeDecode1MemberBroadcast(t *testing.T) {
timestamp := uint32(87878787)
sender := Node{
ip: net.IP([]byte{127, 0, 0, 1}),
port: 1234,
timestamp: timestamp,
pingMillis: PingNoData}
member := Node{
ip: net.IP([]byte{127, 0, 0, 2}),
port: 9000,
timestamp: timestamp,
pingMillis: PingNoData}
message := message{
sender: &sender,
senderHeartbeat: 255,
verb: verbPing}
message.addMember(&member, StatusDead, 38)
broadcast := Broadcast{
bytes: []byte("This is a message"),
origin: &sender,
index: 42}
message.addBroadcast(&broadcast)
if message.broadcast == nil {
t.Error("Broadcast not set properly")
}
ip := net.IP([]byte{127, 0, 0, 1})
bytes := message.encode()
decoded, err := decodeMessage(ip, bytes)
decoded.sender.timestamp = timestamp
decoded.members[0].node.timestamp = timestamp
if err != nil {
t.Error(err)
}
if decoded.broadcast == nil {
t.Error("Broadcast not decoded")
}
message.broadcast.origin = nil
decoded.broadcast.origin = nil
if !reflect.DeepEqual(message.broadcast, decoded.broadcast) {
t.Error("Broadcasts do not match:")
t.Error(" Input bcast:", message.broadcast)
t.Error("Output bcast:", decoded.broadcast)
}
}

111
vendor/github.com/clockworksoul/smudge/node.go generated vendored Normal file
View File

@ -0,0 +1,111 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"fmt"
"net"
"time"
)
const (
// PingNoData is returned by n.PingMillis() to indicate that a node has
// not yet been pinged, and therefore no ping data exists.
PingNoData int = -1
// PingTimedOut is returned by n.PingMillis() to indicate that a node's
// last PING timed out. This is the typical value for dead nodes.
PingTimedOut int = -2
)
// Node represents a single node in the cluster.
type Node struct {
ip net.IP
port uint16
timestamp uint32
address string
pingMillis int
status NodeStatus
emitCounter int8
heartbeat uint32
}
// Address rReturns the address for this node in string format, which is simply
// the node's local IP and listen port. This is used as a unique identifier
// throughout the code base.
func (n *Node) Address() string {
if n.address == "" {
n.address = nodeAddressString(n.ip, n.port)
}
return n.address
}
// Age returns the time since we last heard from this node, in milliseconds.
func (n *Node) Age() uint32 {
return GetNowInMillis() - n.timestamp
}
// EmitCounter returns the number of times remaining that current status
// will be emitted by this node to other nodes.
func (n *Node) EmitCounter() int8 {
return n.emitCounter
}
// IP returns the IP associated with this node.
func (n *Node) IP() net.IP {
return n.ip
}
// PingMillis returns the milliseconds transpired between the most recent
// PING to this node and its responded ACK. If this node has not yet been
// pinged, this vaue will be PingNoData (-1). If this node's last PING timed
// out, this value will be PingTimedOut (-2).
func (n *Node) PingMillis() int {
return n.pingMillis
}
// Port returns the port associated with this node.
func (n *Node) Port() uint16 {
return n.port
}
// Status returns this node's current status.
func (n *Node) Status() NodeStatus {
return n.status
}
// Timestamp returns the timestamp of this node's last ping or status update,
// in milliseconds from the epoch
func (n *Node) Timestamp() uint32 {
return n.timestamp
}
// Touch updates the timestamp to the local time in milliseconds.
func (n *Node) Touch() {
n.timestamp = GetNowInMillis()
}
func nodeAddressString(ip net.IP, port uint16) string {
return fmt.Sprintf("%s:%d", ip, port)
}
// GetNowInMillis returns the current local time in milliseconds since the
// epoch.
func GetNowInMillis() uint32 {
return uint32(time.Now().UnixNano() / int64(time.Millisecond))
}

185
vendor/github.com/clockworksoul/smudge/nodeMap.go generated vendored Normal file
View File

@ -0,0 +1,185 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"math/rand"
"net"
"sync"
)
type nodeMap struct {
sync.RWMutex
nodes map[string]*Node
}
func (m *nodeMap) init() {
m.nodes = make(map[string]*Node)
}
// Adds a node. Returns key, value.
// Updates node heartbeat in the process.
// This is the method called by all Add* functions.
func (m *nodeMap) add(node *Node) (string, *Node, error) {
key := node.Address()
m.Lock()
m.nodes[node.Address()] = node
m.Unlock()
return key, node, nil
}
func (m *nodeMap) delete(node *Node) (string, *Node, error) {
m.Lock()
delete(m.nodes, node.Address())
m.Unlock()
return node.Address(), node, nil
}
func (m *nodeMap) contains(node *Node) bool {
return m.containsByAddress(node.Address())
}
func (m *nodeMap) containsByAddress(address string) bool {
m.RLock()
_, ok := m.nodes[address]
m.RUnlock()
return ok
}
// Returns a pointer to the requested Node
func (m *nodeMap) getByAddress(address string) *Node {
m.RLock()
node, _ := m.nodes[address]
m.RUnlock()
return node
}
// Returns a pointer to the requested Node. If port is 0, is uses the value
// of GetListenPort(). If the Node cannot be found, this returns nil.
func (m *nodeMap) getByIP(ip net.IP, port uint16) *Node {
if port == 0 {
port = uint16(GetListenPort())
}
address := nodeAddressString(ip, port)
return m.getByAddress(address)
}
// Returns a slice of Node[] of from 0 to len(nodes) nodes.
// If size is < len(nodes), that many nodes are randomly chosen and
// returned.
func (m *nodeMap) getRandomNodes(size int, exclude ...*Node) []*Node {
allNodes := m.values()
if size == 0 {
size = len(allNodes)
}
// First, shuffle the allNodes slice
for i := range allNodes {
j := rand.Intn(i + 1)
allNodes[i], allNodes[j] = allNodes[j], allNodes[i]
}
// Copy the first size nodes that are not otherwise excluded
filtered := make([]*Node, 0, len(allNodes))
// Horribly inefficient. Fix this later.
var c int
Outer:
for _, n := range allNodes {
// Is the node in the excluded list?
for _, e := range exclude {
if n.Address() == e.Address() {
continue Outer
}
}
// Now we can append it
filtered = append(filtered, n)
c++
if c >= size {
break Outer
}
}
return filtered
}
func (m *nodeMap) length() int {
return len(m.nodes)
}
func (m *nodeMap) lengthWithStatus(status NodeStatus) int {
m.RLock()
i := 0
for _, v := range m.nodes {
if v.status == status {
i++
}
}
m.RUnlock()
return i
}
func (m *nodeMap) keys() []string {
m.RLock()
keys := make([]string, len(m.nodes))
i := 0
for k := range m.nodes {
keys[i] = k
i++
}
m.RUnlock()
return keys
}
func (m *nodeMap) values() []*Node {
m.RLock()
values := make([]*Node, len(m.nodes))
i := 0
for _, v := range m.nodes {
values[i] = v
i++
}
m.RUnlock()
return values
}

50
vendor/github.com/clockworksoul/smudge/nodeStatus.go generated vendored Normal file
View File

@ -0,0 +1,50 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
// NodeStatus represents the believed status of a member node.
type NodeStatus byte
const (
// StatusUnknown is the default node status of newly-created nodes.
StatusUnknown NodeStatus = iota
// StatusAlive indicates that a node is alive and healthy.
StatusAlive
// StatusDead indicatates that a node is dead and no longer healthy.
StatusDead
// StatusForwardTo is a pseudo status used by message to indicate
// the target of a ping request.
StatusForwardTo
)
func (s NodeStatus) String() string {
switch s {
case StatusUnknown:
return "UNKNOWN"
case StatusAlive:
return "ALIVE"
case StatusDead:
return "DEAD"
case StatusForwardTo:
return "FORWARD_TO"
default:
return "UNDEFINED"
}
}

117
vendor/github.com/clockworksoul/smudge/pingData.go generated vendored Normal file
View File

@ -0,0 +1,117 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"math"
"sync"
)
type pingData struct {
sync.RWMutex
// The ping data. Initialized with default values by NewPingData()
pings []uint32
// The index in pings where the next datapoint will be added
pointer int
// The last calulcated mean. Recalculated if updated is true
lastMean float64
// The last calulcated standard deviation. Recalculated if updated is true
lastStddev float64
// The modified flag. Set to true when a datapoint is added
updated bool
}
func newPingData(initialAverage int, historyCount int) pingData {
newPings := make([]uint32, historyCount, historyCount)
for i := 0; i < historyCount; i++ {
newPings[i] = uint32(initialAverage)
}
return pingData{pings: newPings, updated: true}
}
func (pd *pingData) add(datapoint uint32) {
pd.Lock()
pd.pings[pd.pointer] = datapoint
// Advance the pointer
pd.pointer++
pd.pointer %= len(pd.pings)
pd.updated = true
pd.Unlock()
}
// mean returns the simple mean (average) of the collected datapoints.
func (pd *pingData) mean() float64 {
pd.data()
return pd.lastMean
}
// Returns the mean modified by the requested number of sigmas
func (pd *pingData) nSigma(sigmas float64) float64 {
mean, stddev := pd.data()
return mean + (sigmas * stddev)
}
// stddev returns the standard deviation of the collected datapoints
func (pd *pingData) stddev() float64 {
pd.data()
return pd.lastStddev
}
// Returns both mean and standard deviation
func (pd *pingData) data() (float64, float64) {
if pd.updated {
pd.Lock()
// Calculate the mean
var accumulator float64
for _, d := range pd.pings {
accumulator += float64(d)
}
pd.lastMean = accumulator / float64(len(pd.pings))
// Subtract the mean and square the result; calculcate the mean
accumulator = 0.0 // Reusing accumulator.
for _, d := range pd.pings {
diff := pd.lastMean - float64(d)
accumulator += math.Pow(diff, 2.0)
}
squareDiffMean := accumulator / float64(len(pd.pings))
// Sqrt the square diffs mean and we have our stddev
pd.lastStddev = math.Sqrt(squareDiffMean)
pd.updated = false
pd.Unlock()
}
return pd.lastMean, pd.lastStddev
}

209
vendor/github.com/clockworksoul/smudge/properties.go generated vendored Normal file
View File

@ -0,0 +1,209 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"os"
"regexp"
"strconv"
"strings"
)
// Provides a series of methods and constants that revolve around the getting
// (or programmatically setting/overriding) environmental properties, returning
// default values if not set.
const (
// EnvVarHeartbeatMillis is the name of the environment variable that
// sets the heartbeat frequency (in millis).
EnvVarHeartbeatMillis = "SMUDGE_HEARTBEAT_MILLIS"
// DefaultHeartbeatMillis is the default heartbeat frequency (in millis).
DefaultHeartbeatMillis int = 250
// EnvVarInitialHosts is the name of the environment variable that sets
// the initial known hosts. The value it sets should be a comma-delimitted
// string of one or more IP:PORT pairs (port is optional if it matched the
// value of SMUDGE_LISTEN_PORT).
EnvVarInitialHosts = "SMUDGE_INITIAL_HOSTS"
// DefaultInitialHosts default lists of initially known hosts.
DefaultInitialHosts string = ""
// EnvVarListenPort is the name of the environment variable that sets
// the UDP listen port.
EnvVarListenPort = "SMUDGE_LISTEN_PORT"
// DefaultListenPort is the default UDP listen port.
DefaultListenPort int = 9999
// EnvVarMaxBroadcastBytes is the name of the environment variable that
// the maximum byte length for broadcast payloads. Note that increasing
// this runs the risk of packet fragmentation and dropped messages.
EnvVarMaxBroadcastBytes = "SMUDGE_MAX_BROADCAST_BYTES"
// DefaultMaxBroadcastBytes is the default maximum byte length for
// broadcast payloads. This is guided by the maximum safe UDP packet size
// of 508 bytes, which must also contain status updates and additional
// message overhead.
DefaultMaxBroadcastBytes int = 256
)
var heartbeatMillis int
var listenPort int
var initialHosts []string
var maxBroadcastBytes int
const stringListDelimitRegex = "\\s*((,\\s*)|(\\s+))"
// GetHeartbeatMillis gets this host's heartbeat frequency in milliseconds.
func GetHeartbeatMillis() int {
if heartbeatMillis == 0 {
heartbeatMillis = getIntVar(EnvVarHeartbeatMillis, DefaultHeartbeatMillis)
}
return heartbeatMillis
}
// GetInitialHosts returns the list of initially known hosts.
func GetInitialHosts() []string {
if initialHosts == nil {
initialHosts = getStringArrayVar(EnvVarInitialHosts, DefaultInitialHosts)
}
return initialHosts
}
// GetListenPort returns the port that this host will listen on.
func GetListenPort() int {
if listenPort == 0 {
listenPort = getIntVar(EnvVarListenPort, DefaultListenPort)
}
return listenPort
}
// GetMaxBroadcastBytes returns the maximum byte length for broadcast payloads.
func GetMaxBroadcastBytes() int {
if maxBroadcastBytes == 0 {
maxBroadcastBytes = getIntVar(EnvVarMaxBroadcastBytes, DefaultMaxBroadcastBytes)
}
return maxBroadcastBytes
}
// SetHeartbeatMillis sets this nodes heartbeat frequency. Unlike
// SetListenPort(), calling this function after Begin() has been called will
// have an effect.
func SetHeartbeatMillis(val int) {
if val == 0 {
heartbeatMillis = DefaultListenPort
} else {
heartbeatMillis = val
}
heartbeatMillis = val
}
// SetListenPort sets the UDP port to listen on. It has no effect once
// Begin() has been called.
func SetListenPort(val int) {
if val == 0 {
listenPort = DefaultListenPort
} else {
listenPort = val
}
}
// SetMaxBroadcastBytes sets the maximum byte length for broadcast payloads.
// Note that increasing this beyond the default of 256 runs the risk of packet
// fragmentation and dropped messages.
func SetMaxBroadcastBytes(val int) {
if val == 0 {
maxBroadcastBytes = DefaultMaxBroadcastBytes
} else {
maxBroadcastBytes = val
}
}
// Gets an environmental variable "key". If it does not exist, "defaultVal" is
// returned; if it does, it attempts to convert to an integer, returning
// "defaultVal" is it fails.
func getIntVar(key string, defaultVal int) int {
valueString := os.Getenv(key)
valueInt := defaultVal
if valueString != "" {
i, err := strconv.Atoi(key)
if err != nil {
logfWarn("Failed to parse env property %s: %s is not "+
"an integer. Using default.\n", key, valueString)
} else {
valueInt = i
}
}
return valueInt
}
// Gets an environmental variable "key". If it does not exist, "defaultVal" is
// returned; if it does, it attempts to convert to a string slice, returning
// "defaultVal" is it fails.
func getStringArrayVar(key string, defaultVal string) []string {
valueString := os.Getenv(key)
if valueString == "" {
valueString = defaultVal
}
valueSlice := splitDelimmitedString(valueString, stringListDelimitRegex)
return valueSlice
}
// Splits a string on a regular expression.
func splitDelimmitedString(str string, regex string) []string {
var result []string
str = strings.TrimSpace(str)
if str != "" {
reg := regexp.MustCompile(regex)
indices := reg.FindAllStringIndex(str, -1)
result = make([]string, len(indices)+1)
lastStart := 0
for i, val := range indices {
result[i] = str[lastStart:val[0]]
lastStart = val[1]
}
result[len(indices)] = str[lastStart:]
// Special case of single empty string
if len(result) == 1 && result[0] == "" {
result = make([]string, 0, 0)
}
}
return result
}

View File

@ -0,0 +1,111 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"testing"
)
func TestSplitString0a(t *testing.T) {
str := ""
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 0 {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString0b(t *testing.T) {
str := " "
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 0 {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString1(t *testing.T) {
str := "foo"
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 1 || split[0] != "foo" {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString2a(t *testing.T) {
str := "foo bar"
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 2 || split[0] != "foo" {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString2b(t *testing.T) {
str := "foo, bar"
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 2 || split[0] != "foo" {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString2c(t *testing.T) {
str := "foo bar"
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 2 || split[0] != "foo" || split[1] != "bar" {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString2d(t *testing.T) {
str := "localhost:10000,localhost:9999"
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 2 || split[0] != "localhost:10000" {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString3a(t *testing.T) {
str := "foo bar bat"
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 3 || split[0] != "foo" {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString3b(t *testing.T) {
str := "foo, bar, bat"
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 3 || split[0] != "foo" {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}
func TestSplitString3c(t *testing.T) {
str := "foo bar, bat"
split := splitDelimmitedString(str, stringListDelimitRegex)
if len(split) != 3 || split[0] != "foo" {
t.Errorf("len=%d contents=%v\n", len(split), split)
}
}

325
vendor/github.com/clockworksoul/smudge/registry.go generated vendored Normal file
View File

@ -0,0 +1,325 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"errors"
"net"
"sort"
"strconv"
"strings"
"sync"
)
// All known nodes, living and dead. Dead nodes are pinged (far) less often,
// and are eventually removed
var knownNodes = nodeMap{}
// All nodes that have been updated "recently", living and dead
var updatedNodes = nodeMap{}
var deadNodeRetries = struct {
sync.RWMutex
m map[string]*deadNodeCounter
}{m: make(map[string]*deadNodeCounter)}
const maxDeadNodeRetries = 10
func init() {
knownNodes.init()
updatedNodes.init()
}
/******************************************************************************
* Exported functions (for public consumption)
*****************************************************************************/
// AddNode can be used to explicitly add a node to the list of known live
// nodes. Updates the node timestamp but DOES NOT implicitly update the node's
// status; you need to do this explicitly.
func AddNode(node *Node) (*Node, error) {
if !knownNodes.contains(node) {
if node.status == StatusUnknown {
logWarn(node.Address(),
"does not have a status! Setting to",
StatusAlive)
UpdateNodeStatus(node, StatusAlive)
} else if node.status == StatusForwardTo {
panic("invalid status: " + StatusForwardTo.String())
}
node.Touch()
_, n, err := knownNodes.add(node)
logfInfo("Adding host: %s (total=%d live=%d dead=%d)\n",
node.Address(),
knownNodes.length(),
knownNodes.lengthWithStatus(StatusAlive),
knownNodes.lengthWithStatus(StatusDead))
knownNodesModifiedFlag = true
return n, err
}
return node, nil
}
// CreateNodeByAddress will create and return a new node when supplied with a
// node address ("ip:port" string). This doesn't add the node to the list of
// live nodes; use AddNode().
func CreateNodeByAddress(address string) (*Node, error) {
ip, port, err := parseNodeAddress(address)
if err == nil {
return CreateNodeByIP(ip, port)
}
return nil, err
}
// CreateNodeByIP will create and return a new node when supplied with an
// IP address and port number. This doesn't add the node to the list of live
// nodes; use AddNode().
func CreateNodeByIP(ip net.IP, port uint16) (*Node, error) {
node := Node{
ip: ip,
port: port,
timestamp: GetNowInMillis(),
pingMillis: PingNoData,
}
return &node, nil
}
// GetLocalIP queries the host interface to determine the local IPv4 of this
// machine. If a local IPv4 cannot be found, then nil is returned. If the
// query to the underlying OS fails, an error is returned.
func GetLocalIP() (net.IP, error) {
var ip net.IP
addrs, err := net.InterfaceAddrs()
if err != nil {
return ip, err
}
for _, a := range addrs {
if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
ip = ipnet.IP.To4()
break
}
}
}
return ip, nil
}
// AllNodes will return a list of all nodes known at the time of the request,
// including nodes that have been marked as "dead" but haven't yet been
// removed from the registry.
func AllNodes() []*Node {
return knownNodes.values()
}
// HealthyNodes will return a list of all nodes known at the time of the
// request with a healthy status.
func HealthyNodes() []*Node {
values := knownNodes.values()
filtered := make([]*Node, 0, len(values))
for _, v := range values {
if v.Status() == StatusAlive {
filtered = append(filtered, v)
}
}
return filtered
}
// RemoveNode can be used to explicitly remove a node from the list of known
// live nodes. Updates the node timestamp but DOES NOT implicitly update the
// node's status; you need to do this explicitly.
func RemoveNode(node *Node) (*Node, error) {
if knownNodes.contains(node) {
node.Touch()
_, n, err := knownNodes.delete(node)
logfInfo("Removing host: %s (total=%d live=%d dead=%d)\n",
node.Address(),
knownNodes.length(),
knownNodes.lengthWithStatus(StatusAlive),
knownNodes.lengthWithStatus(StatusDead))
knownNodesModifiedFlag = true
return n, err
}
return node, nil
}
// UpdateNodeStatus assigns a new status for the specified node and adds it to
// the list of recently updated nodes. If the status is StatusDead, then the
// node will be moved from the live nodes list to the dead nodes list.
func UpdateNodeStatus(node *Node, status NodeStatus) {
updateNodeStatus(node, status, node.heartbeat)
}
/******************************************************************************
* Private functions (for internal use only)
*****************************************************************************/
func getRandomUpdatedNodes(size int, exclude ...*Node) []*Node {
updatedNodesCopy := nodeMap{}
updatedNodesCopy.init()
// Prune nodes with emit counters of 0 (or less) from the map. Any
// others we copy into a secondary nodemap.
for _, n := range updatedNodes.values() {
if n.emitCounter <= 0 {
logDebug("Removing", n.Address(), "from recently updated list")
updatedNodes.delete(n)
} else {
updatedNodesCopy.add(n)
}
}
// Exclude the exclusions
for _, ex := range exclude {
updatedNodesCopy.delete(ex)
}
// Put the newest nodes on top.
updatedNodesSlice := updatedNodesCopy.values()
sort.Sort(byNodeEmitCounter(updatedNodesSlice))
// Grab and return the top N
if size > len(updatedNodesSlice) {
size = len(updatedNodesSlice)
}
return updatedNodesSlice[:size]
}
func parseNodeAddress(hostAndMaybePort string) (net.IP, uint16, error) {
var host string
var ip net.IP
var port uint16
var err error
if strings.Contains(hostAndMaybePort, ":") {
splode := strings.Split(hostAndMaybePort, ":")
if len(splode) == 2 {
p, e := strconv.ParseUint(splode[1], 10, 16)
host = splode[0]
port = uint16(p)
err = e
} else {
err = errors.New("too many colons in argument " + hostAndMaybePort)
}
} else {
host = hostAndMaybePort
port = uint16(GetListenPort())
}
ips, err := net.LookupIP(host)
if err != nil {
return ip, port, err
}
for _, i := range ips {
if i.To4() != nil {
ip = i.To4()
}
}
if ip.IsLoopback() {
ip, err = GetLocalIP()
if ip == nil {
logWarn("Warning: Could not resolve host IP. Using 127.0.0.1")
ip = []byte{127, 0, 0, 1}
}
}
return ip, port, err
}
// UpdateNodeStatus assigns a new status for the specified node and adds it to
// the list of recently updated nodes. If the status is StatusDead, then the
// node will be moved from the live nodes list to the dead nodes list.
func updateNodeStatus(node *Node, status NodeStatus, heartbeat uint32) {
if node.status != status {
if heartbeat < node.heartbeat {
logfWarn("Decreasing known node heartbeat value from %d to %d\n",
node.heartbeat,
heartbeat)
}
node.timestamp = GetNowInMillis()
node.status = status
node.emitCounter = int8(emitCount())
node.heartbeat = heartbeat
// If this isn't in the recently updated list, add it.
if !updatedNodes.contains(node) {
updatedNodes.add(node)
}
if status != StatusDead {
deadNodeRetries.Lock()
delete(deadNodeRetries.m, node.Address())
deadNodeRetries.Unlock()
}
logfInfo("Updating host: %s to %s (total=%d live=%d dead=%d)\n",
node.Address(),
status,
knownNodes.length(),
knownNodes.lengthWithStatus(StatusAlive),
knownNodes.lengthWithStatus(StatusDead))
doStatusUpdate(node, status)
}
}
type deadNodeCounter struct {
retry int
retryCountdown int
}
// byNodeEmitCounter implements sort.Interface for []*Node based on
// the emitCounter field.
type byNodeEmitCounter []*Node
func (a byNodeEmitCounter) Len() int {
return len(a)
}
func (a byNodeEmitCounter) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a byNodeEmitCounter) Less(i, j int) bool {
return a[i].emitCounter > a[j].emitCounter
}

View File

@ -0,0 +1 @@
This directory is contains a simple CLI tool used to test Smudge's member discovery and status dissemination functionality.

View File

@ -0,0 +1,60 @@
/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"github.com/clockworksoul/smudge"
)
func main() {
var nodeAddress string
var heartbeatMillis int
var listenPort int
var err error
flag.StringVar(&nodeAddress, "node", "", "Initial node")
flag.IntVar(&listenPort, "port",
int(smudge.GetListenPort()),
"The bind port")
flag.IntVar(&heartbeatMillis, "hbf",
int(smudge.GetHeartbeatMillis()),
"The heartbeat frequency in milliseconds")
flag.Parse()
smudge.SetLogThreshold(smudge.LogInfo)
smudge.SetListenPort(listenPort)
smudge.SetHeartbeatMillis(heartbeatMillis)
if nodeAddress != "" {
node, err := smudge.CreateNodeByAddress(nodeAddress)
if err == nil {
smudge.AddNode(node)
}
}
if err == nil {
smudge.Begin()
} else {
fmt.Println(err)
}
}