Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions v2/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,20 @@ func (p *P2P) PeerIDs() []PeerID {
/* Message Handling */

// Publish sends a message to the specified peer's message queue.
func (p *P2P) Publish(id PeerID, msg string, protocol ProtocolFunc, params map[string]any) error {
func (p *P2P) Publish(id PeerID, msg string, protocol ProtocolFunc, staticParams, dynamicParams map[string]any) error {
if peer, ok := p.peers[id]; ok {
if !peer.alive {
return fmt.Errorf("peer %s is not alive", id)
}

peer.msgQueue <- Message{
Publisher: id,
From: id,
Content: msg,
Protocol: protocol,
Params: params,
HopCount: 0,
Publisher: id,
From: id,
Content: msg,
Protocol: protocol,
StaticParams: staticParams,
DynamicParams: dynamicParams,
HopCount: 0,
}
return nil
}
Expand Down
48 changes: 33 additions & 15 deletions v2/p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,12 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) {
protocol := msg.Protocol
hopCount := msg.HopCount

delay := time.Duration(p.processingLatency * float64(time.Millisecond))
if remain := delay - time.Since(start); remain > 0 {
time.Sleep(remain)
}

p.mu.Lock()
defer p.mu.Unlock()

if _, ok := p.sentTo[content]; !ok {
p.sentTo[content] = make(map[PeerID]struct{})
Expand All @@ -157,7 +161,11 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) {
receivedEdges = append(receivedEdges, senderID)
}

targets := msg.Protocol(p.id, msg, allEdges, sentEdges, receivedEdges, msg.Params)
targets, dynamicParams := msg.Protocol(p.id, msg, allEdges, sentEdges, receivedEdges, msg.StaticParams, msg.DynamicParams)

if dynamicParams == nil {
dynamicParams = make(map[PeerID]map[string]any)
}

for _, targetID := range *targets {
for _, edge := range p.edges {
Expand All @@ -168,11 +176,6 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) {
}
}

delay := time.Duration(p.processingLatency * float64(time.Millisecond))
if remain := delay - time.Since(start); remain > 0 {
time.Sleep(remain)
}

for _, e := range willSendEdges {
edgeCopy := e
p.sentTo[content][e.targetID] = struct{}{}
Expand All @@ -191,23 +194,38 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) {
})

go func(e edge) {
time.Sleep(time.Duration(e.networkLatency) * time.Millisecond)
time.Sleep(time.Duration(e.networkLatency * float64(time.Millisecond)))

targetPeer, ok := network.peers[e.targetID]
if !ok || targetPeer == nil {
if !ok || targetPeer == nil || !targetPeer.alive {
return
}

var dynamics map[string]any

if _, ok := dynamicParams[e.targetID]; !ok {
dynamics = nil
} else {
dynamics = make(map[string]any)

for k, v := range dynamicParams[e.targetID] {
dynamics[k] = v
}
}

network.peers[e.targetID].msgQueue <- Message{
Publisher: msg.Publisher,
From: p.id,
Content: content,
Protocol: protocol,
HopCount: hopCount + 1,
Params: msg.Params,
Publisher: msg.Publisher,
From: p.id,
Content: content,
Protocol: protocol,
HopCount: hopCount + 1,
StaticParams: msg.StaticParams,
DynamicParams: dynamics,
}
}(edgeCopy)
}

p.mu.Unlock()
}

// eachStop marks the peer as inactive and closes its message queue.
Expand Down
35 changes: 20 additions & 15 deletions v2/p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,23 @@ import (

// Message represents a message sent between nodes in the P2P network.
type Message struct {
Publisher PeerID // ID of the peer that originally published the message
From PeerID // ID of the peer that sent the message to the current peer
Content string // the actual content of the message
HopCount int // the number of hops the message has taken from the publisher to the current peer
Protocol ProtocolFunc // the protocol function that determines how the message should be processed and forwarded
Params map[string]any // additional parameters for the protocol function
Publisher PeerID // ID of the peer that originally published the message
From PeerID // ID of the peer that sent the message to the current peer
Content string // the actual content of the message
HopCount int // the number of hops the message has taken from the publisher to the current peer
Protocol ProtocolFunc // the protocol function that determines how the message should be processed and forwarded
StaticParams map[string]any // additional parameters for the protocol function
DynamicParams map[string]any // additional parameters that can change during message processing
}

// ProtocolFunc defines the function signature for custom protocols in the P2P network.
// It takes the current peer's ID, the message being processed, a list of neighbor IDs,
// lists of peers the message has been sent to and received from, and any additional broadcast parameters.
// It returns a pointer to a slice of PeerIDs that the message should be forwarded to.
type ProtocolFunc func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, broadcastParams map[string]any) *[]PeerID
// lists of peers the message has been sent to and received from, and any additional static parameters.
// It returns a pointer to a slice of PeerIDs that the message should be forwarded to, along with any dynamic parameters.
type ProtocolFunc func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, staticParams, dynamicParams map[string]any) (*[]PeerID, map[PeerID]map[string]any)

// Flooding is a simple broadcast protocol where each peer forwards the message to all its neighbors except those it has already sent to or received from.
var Flooding ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, broadcastParams map[string]any) *[]PeerID {
var Flooding ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, staticParams, dynamicParams map[string]any) (*[]PeerID, map[PeerID]map[string]any) {
targets := make([]PeerID, 0)
for _, neighbor := range neighbors {
if slices.Contains(sentPeers, neighbor) {
Expand All @@ -35,11 +36,11 @@ var Flooding ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sen
targets = append(targets, neighbor)
}

return &targets
return &targets, nil
}

// Gossip is a broadcast protocol where each peer forwards the message to a random subset of its neighbors, determined by the gossip factor.
var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, broadcastParams map[string]any) *[]PeerID {
var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, staticParams, dynamicParams map[string]any) (*[]PeerID, map[PeerID]map[string]any) {
targets := make([]PeerID, 0)
for _, neighbor := range neighbors {
if slices.Contains(sentPeers, neighbor) {
Expand All @@ -52,8 +53,8 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP
targets = append(targets, neighbor)
}

gossipFactor, ok1 := broadcastParams["gossip_factor"].(float64)
gossipNode, ok2 := broadcastParams["gossip_node"].(int)
gossipFactor, ok1 := staticParams["gossip_factor"].(float64)
gossipNode, ok2 := staticParams["gossip_node"].(int)

if !ok1 && !ok2 {
ok1 = true
Expand All @@ -70,9 +71,13 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP
targets = targets[:k]
} else if ok2 {
k := gossipNode
if k > len(targets) {
k = len(targets)
}

targets = targets[:k]
}
}

return &targets
return &targets, nil
}
Loading