296 lines
7.6 KiB
Go
296 lines
7.6 KiB
Go
|
/*
|
||
|
Copyright 2016 The Smudge Authors.
|
||
|
|
||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
you may not use this file except in compliance with the License.
|
||
|
You may obtain a copy of the License at
|
||
|
|
||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||
|
|
||
|
Unless required by applicable law or agreed to in writing, software
|
||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
See the License for the specific language governing permissions and
|
||
|
limitations under the License.
|
||
|
*/
|
||
|
|
||
|
package smudge
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"hash/adler32"
|
||
|
"net"
|
||
|
)
|
||
|
|
||
|
// Message contents
|
||
|
// ---[ Base message (11 bytes)]---
|
||
|
// Bytes 00-03 Checksum (32-bit)
|
||
|
// Bytes 04 Verb (one of {PING|ACK|PINGREQ|NFPING})
|
||
|
// Bytes 05-06 Sender response port
|
||
|
// Bytes 07-10 Sender current heartbeat
|
||
|
// ---[ Per member (11 bytes)]---
|
||
|
// Bytes 00 Member status byte
|
||
|
// Bytes 01-04 Member host IP
|
||
|
// Bytes 05-06 Member host response port
|
||
|
// Bytes 07-10 Sender current heartbeat
|
||
|
// ---[ Per broadcast (1 allowed) (11+N bytes) ]
|
||
|
// Bytes 00-03 Origin IP
|
||
|
// Bytes 04-05 Origin response port
|
||
|
// Bytes 06-09 Origin broadcast counter
|
||
|
// Bytes 10-11 Payload length (bytes)
|
||
|
// Bytes 12-NN Payload
|
||
|
|
||
|
type message struct {
|
||
|
sender *Node
|
||
|
senderHeartbeat uint32
|
||
|
verb messageVerb
|
||
|
members []*messageMember
|
||
|
broadcast *Broadcast
|
||
|
}
|
||
|
|
||
|
// Represents a "member" of a message; i.e., a node that the sender knows
|
||
|
// about, about which it wishes to notify the downstream recipient.
|
||
|
type messageMember struct {
|
||
|
heartbeat uint32
|
||
|
node *Node
|
||
|
status NodeStatus
|
||
|
}
|
||
|
|
||
|
// Convenience function. Creates a new message instance.
|
||
|
func newMessage(verb messageVerb, sender *Node, senderHeartbeat uint32) message {
|
||
|
return message{
|
||
|
sender: sender,
|
||
|
senderHeartbeat: senderHeartbeat,
|
||
|
verb: verb,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Adds a broadcast to this message. Only one broadcast is allowed; subsequent
|
||
|
// calls will replace an existing broadcast.
|
||
|
func (m *message) addBroadcast(broadcast *Broadcast) {
|
||
|
m.broadcast = broadcast
|
||
|
}
|
||
|
|
||
|
// Adds a member status update to this message. The maximum number of allowed
|
||
|
// members is 2^6 - 1 = 63, though it is incredibly unlikely that this maximum
|
||
|
// will be reached without an absurdly high lambda. There aren't yet many
|
||
|
// 88 billion node clusters (assuming lambda of 2.5).
|
||
|
func (m *message) addMember(n *Node, status NodeStatus, heartbeat uint32) error {
|
||
|
if m.members == nil {
|
||
|
m.members = make([]*messageMember, 0, 32)
|
||
|
} else if len(m.members) >= 63 {
|
||
|
return errors.New("member list overflow")
|
||
|
}
|
||
|
|
||
|
messageMember := messageMember{
|
||
|
heartbeat: heartbeat,
|
||
|
node: n,
|
||
|
status: status}
|
||
|
|
||
|
m.members = append(m.members, &messageMember)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Message contents
|
||
|
// ---[ Base message (12 bytes)]---
|
||
|
// Bytes 00-03 Checksum (32-bit)
|
||
|
// Bytes 04 Verb (one of {PING|ACK|PINGREQ|NFPING})
|
||
|
// Bytes 05-06 Sender response port
|
||
|
// Bytes 07-10 Sender ID Code
|
||
|
// ---[ Per member (11 bytes)]---
|
||
|
// Bytes 00 Member status byte
|
||
|
// Bytes 01-04 Member host IP
|
||
|
// Bytes 05-06 Member host response port
|
||
|
// Bytes 07-10 Member heartbeat
|
||
|
|
||
|
func (m *message) encode() []byte {
|
||
|
size := 11 + (len(m.members) * 11)
|
||
|
if m.broadcast != nil {
|
||
|
size += 12 + len(m.broadcast.bytes)
|
||
|
}
|
||
|
|
||
|
bytes := make([]byte, size, size)
|
||
|
|
||
|
// An index pointer (start at 4 to accommodate checksum)
|
||
|
p := 4
|
||
|
|
||
|
// Byte 00
|
||
|
// Rightmost 2 bits: verb (one of {P|A|F|N})
|
||
|
// Leftmost 6 bits: number of members in payload
|
||
|
verbByte := byte(len(m.members))
|
||
|
verbByte = (verbByte << 2) | byte(m.verb)
|
||
|
p += encodeByte(verbByte, bytes, p)
|
||
|
|
||
|
// Bytes 01-02 Sender response port
|
||
|
p += encodeUint16(m.sender.port, bytes, p)
|
||
|
|
||
|
// Bytes 03-06 ID Code
|
||
|
p += encodeUint32(m.senderHeartbeat, bytes, p)
|
||
|
|
||
|
// Each member data requires 11 bytes.
|
||
|
for _, member := range m.members {
|
||
|
mnode := member.node
|
||
|
mstatus := member.status
|
||
|
mcode := member.heartbeat
|
||
|
|
||
|
// Byte p + 00
|
||
|
bytes[p] = byte(mstatus)
|
||
|
p++
|
||
|
|
||
|
// Bytes (p + 01) to (p + 04): Originating host IP
|
||
|
ipb := mnode.ip
|
||
|
for i := 0; i < 4; i++ {
|
||
|
bytes[p+i] = ipb[i]
|
||
|
}
|
||
|
p += 4
|
||
|
|
||
|
// Bytes (p + 05) to (p + 06): Originating host response port
|
||
|
p += encodeUint16(mnode.port, bytes, p)
|
||
|
|
||
|
// Bytes (p + 07) to (p + 10): Originating message code
|
||
|
p += encodeUint32(mcode, bytes, p)
|
||
|
}
|
||
|
|
||
|
if m.broadcast != nil {
|
||
|
bbytes := m.broadcast.encode()
|
||
|
for i, v := range bbytes {
|
||
|
bytes[p+i] = v
|
||
|
}
|
||
|
}
|
||
|
|
||
|
checksum := adler32.Checksum(bytes[4:])
|
||
|
encodeUint32(checksum, bytes, 0)
|
||
|
|
||
|
return bytes
|
||
|
}
|
||
|
|
||
|
// If members exist on this message, and that message has the "forward to"
|
||
|
// status, this function returns it; otherwise it returns nil.
|
||
|
func (m *message) getForwardTo() *messageMember {
|
||
|
if len(m.members) > 0 && m.members[0].status == StatusForwardTo {
|
||
|
return m.members[0]
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Parses the bytes received in a UDP message.
|
||
|
// If the address:port from the message can't be associated with a known
|
||
|
// (live) node, then an instance of message.sender will be created from
|
||
|
// available data but not explicitly added to the known nodes.
|
||
|
func decodeMessage(sourceIP net.IP, bytes []byte) (message, error) {
|
||
|
var err error
|
||
|
|
||
|
// An index pointer
|
||
|
p := 0
|
||
|
|
||
|
// Bytes 00-03 Checksum (32-bit)
|
||
|
checksumStated, p := decodeUint32(bytes, p)
|
||
|
checksumCalculated := adler32.Checksum(bytes[4:])
|
||
|
if checksumCalculated != checksumStated {
|
||
|
return newMessage(255, nil, 0),
|
||
|
errors.New("checksum failure from " + sourceIP.String())
|
||
|
}
|
||
|
|
||
|
// Byte 04
|
||
|
// Rightmost 2 bits: verb (one of {P|A|F|N})
|
||
|
// Leftmost 6 bits: number of members in payload
|
||
|
v, p := decodeByte(bytes, p)
|
||
|
verb := messageVerb(v & 0x03)
|
||
|
|
||
|
memberCount := int(v >> 2)
|
||
|
|
||
|
// Bytes 05-06 Sender response port
|
||
|
senderPort, p := decodeUint16(bytes, p)
|
||
|
|
||
|
// Bytes 07-10 Sender ID Code
|
||
|
senderHeartbeat, p := decodeUint32(bytes, p)
|
||
|
|
||
|
// Now that we have the IP and port, we can find the Node.
|
||
|
sender := knownNodes.getByIP(sourceIP.To4(), senderPort)
|
||
|
|
||
|
// We don't know this node, so create a new one!
|
||
|
if sender == nil {
|
||
|
sender, _ = CreateNodeByIP(sourceIP.To4(), senderPort)
|
||
|
}
|
||
|
|
||
|
// Now that we have the verb, node, and code, we can build the mesage
|
||
|
m := newMessage(verb, sender, senderHeartbeat)
|
||
|
|
||
|
memberLastIndex := p + (memberCount * 11)
|
||
|
if len(bytes) > p {
|
||
|
m.members = decodeMembers(memberCount, bytes[p:memberLastIndex])
|
||
|
}
|
||
|
|
||
|
if len(bytes) > memberLastIndex {
|
||
|
m.broadcast, err = decodeBroadcast(bytes[memberLastIndex:])
|
||
|
|
||
|
if m.broadcast.origin.IP()[0] == 0 || m.broadcast.origin.Port() == 0 {
|
||
|
err = errors.New("Received originless broadcast!")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return m, err
|
||
|
}
|
||
|
|
||
|
func decodeMembers(memberCount int, bytes []byte) []*messageMember {
|
||
|
// Bytes 00 Member status byte
|
||
|
// Bytes 01-04 Member host IP
|
||
|
// Bytes 05-06 Member host response port
|
||
|
// Bytes 07-10 Member heartbeat
|
||
|
|
||
|
members := make([]*messageMember, 0, 1)
|
||
|
|
||
|
// An index pointer
|
||
|
p := 0
|
||
|
|
||
|
for p < len(bytes) {
|
||
|
var mstatus NodeStatus
|
||
|
var mip net.IP
|
||
|
var mport uint16
|
||
|
var mcode uint32
|
||
|
var mnode *Node
|
||
|
|
||
|
// Byte 00 Member status byte
|
||
|
mstatus = NodeStatus(bytes[p])
|
||
|
p++
|
||
|
|
||
|
// Bytes 01-04 member IP
|
||
|
if bytes[p] > 0 {
|
||
|
mip = net.IPv4(
|
||
|
bytes[p+0],
|
||
|
bytes[p+1],
|
||
|
bytes[p+2],
|
||
|
bytes[p+3]).To4()
|
||
|
}
|
||
|
p += 4
|
||
|
|
||
|
// Bytes 05-06 member response port
|
||
|
mport, p = decodeUint16(bytes, p)
|
||
|
|
||
|
// Bytes 07-10 member heartbeat
|
||
|
mcode, p = decodeUint32(bytes, p)
|
||
|
|
||
|
if len(mip) > 0 {
|
||
|
// Find the sender by the address associated with the message
|
||
|
mnode = knownNodes.getByIP(mip, mport)
|
||
|
|
||
|
// We still don't know this node, so create a new one!
|
||
|
if mnode == nil {
|
||
|
mnode, _ = CreateNodeByIP(mip, mport)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
member := messageMember{
|
||
|
heartbeat: mcode,
|
||
|
node: mnode,
|
||
|
status: mstatus,
|
||
|
}
|
||
|
|
||
|
members = append(members, &member)
|
||
|
}
|
||
|
|
||
|
return members
|
||
|
}
|