1
0
This repository has been archived on 2023-12-27. You can view files and clone it, but cannot push or open issues or pull requests.
beginning-go/vendor/github.com/clockworksoul/smudge/message.go

296 lines
7.6 KiB
Go
Raw Permalink Normal View History

/*
Copyright 2016 The Smudge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package smudge
import (
"errors"
"hash/adler32"
"net"
)
// Message contents
// ---[ Base message (11 bytes)]---
// Bytes 00-03 Checksum (32-bit)
// Bytes 04 Verb (one of {PING|ACK|PINGREQ|NFPING})
// Bytes 05-06 Sender response port
// Bytes 07-10 Sender current heartbeat
// ---[ Per member (11 bytes)]---
// Bytes 00 Member status byte
// Bytes 01-04 Member host IP
// Bytes 05-06 Member host response port
// Bytes 07-10 Sender current heartbeat
// ---[ Per broadcast (1 allowed) (11+N bytes) ]
// Bytes 00-03 Origin IP
// Bytes 04-05 Origin response port
// Bytes 06-09 Origin broadcast counter
// Bytes 10-11 Payload length (bytes)
// Bytes 12-NN Payload
type message struct {
sender *Node
senderHeartbeat uint32
verb messageVerb
members []*messageMember
broadcast *Broadcast
}
// Represents a "member" of a message; i.e., a node that the sender knows
// about, about which it wishes to notify the downstream recipient.
type messageMember struct {
heartbeat uint32
node *Node
status NodeStatus
}
// Convenience function. Creates a new message instance.
func newMessage(verb messageVerb, sender *Node, senderHeartbeat uint32) message {
return message{
sender: sender,
senderHeartbeat: senderHeartbeat,
verb: verb,
}
}
// Adds a broadcast to this message. Only one broadcast is allowed; subsequent
// calls will replace an existing broadcast.
func (m *message) addBroadcast(broadcast *Broadcast) {
m.broadcast = broadcast
}
// Adds a member status update to this message. The maximum number of allowed
// members is 2^6 - 1 = 63, though it is incredibly unlikely that this maximum
// will be reached without an absurdly high lambda. There aren't yet many
// 88 billion node clusters (assuming lambda of 2.5).
func (m *message) addMember(n *Node, status NodeStatus, heartbeat uint32) error {
if m.members == nil {
m.members = make([]*messageMember, 0, 32)
} else if len(m.members) >= 63 {
return errors.New("member list overflow")
}
messageMember := messageMember{
heartbeat: heartbeat,
node: n,
status: status}
m.members = append(m.members, &messageMember)
return nil
}
// Message contents
// ---[ Base message (12 bytes)]---
// Bytes 00-03 Checksum (32-bit)
// Bytes 04 Verb (one of {PING|ACK|PINGREQ|NFPING})
// Bytes 05-06 Sender response port
// Bytes 07-10 Sender ID Code
// ---[ Per member (11 bytes)]---
// Bytes 00 Member status byte
// Bytes 01-04 Member host IP
// Bytes 05-06 Member host response port
// Bytes 07-10 Member heartbeat
func (m *message) encode() []byte {
size := 11 + (len(m.members) * 11)
if m.broadcast != nil {
size += 12 + len(m.broadcast.bytes)
}
bytes := make([]byte, size, size)
// An index pointer (start at 4 to accommodate checksum)
p := 4
// Byte 00
// Rightmost 2 bits: verb (one of {P|A|F|N})
// Leftmost 6 bits: number of members in payload
verbByte := byte(len(m.members))
verbByte = (verbByte << 2) | byte(m.verb)
p += encodeByte(verbByte, bytes, p)
// Bytes 01-02 Sender response port
p += encodeUint16(m.sender.port, bytes, p)
// Bytes 03-06 ID Code
p += encodeUint32(m.senderHeartbeat, bytes, p)
// Each member data requires 11 bytes.
for _, member := range m.members {
mnode := member.node
mstatus := member.status
mcode := member.heartbeat
// Byte p + 00
bytes[p] = byte(mstatus)
p++
// Bytes (p + 01) to (p + 04): Originating host IP
ipb := mnode.ip
for i := 0; i < 4; i++ {
bytes[p+i] = ipb[i]
}
p += 4
// Bytes (p + 05) to (p + 06): Originating host response port
p += encodeUint16(mnode.port, bytes, p)
// Bytes (p + 07) to (p + 10): Originating message code
p += encodeUint32(mcode, bytes, p)
}
if m.broadcast != nil {
bbytes := m.broadcast.encode()
for i, v := range bbytes {
bytes[p+i] = v
}
}
checksum := adler32.Checksum(bytes[4:])
encodeUint32(checksum, bytes, 0)
return bytes
}
// If members exist on this message, and that message has the "forward to"
// status, this function returns it; otherwise it returns nil.
func (m *message) getForwardTo() *messageMember {
if len(m.members) > 0 && m.members[0].status == StatusForwardTo {
return m.members[0]
}
return nil
}
// Parses the bytes received in a UDP message.
// If the address:port from the message can't be associated with a known
// (live) node, then an instance of message.sender will be created from
// available data but not explicitly added to the known nodes.
func decodeMessage(sourceIP net.IP, bytes []byte) (message, error) {
var err error
// An index pointer
p := 0
// Bytes 00-03 Checksum (32-bit)
checksumStated, p := decodeUint32(bytes, p)
checksumCalculated := adler32.Checksum(bytes[4:])
if checksumCalculated != checksumStated {
return newMessage(255, nil, 0),
errors.New("checksum failure from " + sourceIP.String())
}
// Byte 04
// Rightmost 2 bits: verb (one of {P|A|F|N})
// Leftmost 6 bits: number of members in payload
v, p := decodeByte(bytes, p)
verb := messageVerb(v & 0x03)
memberCount := int(v >> 2)
// Bytes 05-06 Sender response port
senderPort, p := decodeUint16(bytes, p)
// Bytes 07-10 Sender ID Code
senderHeartbeat, p := decodeUint32(bytes, p)
// Now that we have the IP and port, we can find the Node.
sender := knownNodes.getByIP(sourceIP.To4(), senderPort)
// We don't know this node, so create a new one!
if sender == nil {
sender, _ = CreateNodeByIP(sourceIP.To4(), senderPort)
}
// Now that we have the verb, node, and code, we can build the mesage
m := newMessage(verb, sender, senderHeartbeat)
memberLastIndex := p + (memberCount * 11)
if len(bytes) > p {
m.members = decodeMembers(memberCount, bytes[p:memberLastIndex])
}
if len(bytes) > memberLastIndex {
m.broadcast, err = decodeBroadcast(bytes[memberLastIndex:])
if m.broadcast.origin.IP()[0] == 0 || m.broadcast.origin.Port() == 0 {
err = errors.New("Received originless broadcast!")
}
}
return m, err
}
func decodeMembers(memberCount int, bytes []byte) []*messageMember {
// Bytes 00 Member status byte
// Bytes 01-04 Member host IP
// Bytes 05-06 Member host response port
// Bytes 07-10 Member heartbeat
members := make([]*messageMember, 0, 1)
// An index pointer
p := 0
for p < len(bytes) {
var mstatus NodeStatus
var mip net.IP
var mport uint16
var mcode uint32
var mnode *Node
// Byte 00 Member status byte
mstatus = NodeStatus(bytes[p])
p++
// Bytes 01-04 member IP
if bytes[p] > 0 {
mip = net.IPv4(
bytes[p+0],
bytes[p+1],
bytes[p+2],
bytes[p+3]).To4()
}
p += 4
// Bytes 05-06 member response port
mport, p = decodeUint16(bytes, p)
// Bytes 07-10 member heartbeat
mcode, p = decodeUint32(bytes, p)
if len(mip) > 0 {
// Find the sender by the address associated with the message
mnode = knownNodes.getByIP(mip, mport)
// We still don't know this node, so create a new one!
if mnode == nil {
mnode, _ = CreateNodeByIP(mip, mport)
}
}
member := messageMember{
heartbeat: mcode,
node: mnode,
status: mstatus,
}
members = append(members, &member)
}
return members
}