diff --git a/v2/p2p/p2p.go b/v2/p2p/p2p.go index a362a09..9e2ec5e 100644 --- a/v2/p2p/p2p.go +++ b/v2/p2p/p2p.go @@ -140,19 +140,20 @@ func (p *P2P) PeerIDs() []PeerID { /* Message Handling */ // Publish sends a message to the specified peer's message queue. -func (p *P2P) Publish(id PeerID, msg string, protocol ProtocolFunc, params map[string]any) error { +func (p *P2P) Publish(id PeerID, msg string, protocol ProtocolFunc, staticParams, dynamicParams map[string]any) error { if peer, ok := p.peers[id]; ok { if !peer.alive { return fmt.Errorf("peer %s is not alive", id) } peer.msgQueue <- Message{ - Publisher: id, - From: id, - Content: msg, - Protocol: protocol, - Params: params, - HopCount: 0, + Publisher: id, + From: id, + Content: msg, + Protocol: protocol, + StaticParams: staticParams, + DynamicParams: dynamicParams, + HopCount: 0, } return nil } diff --git a/v2/p2p/peer.go b/v2/p2p/peer.go index a69102d..f4eea8e 100644 --- a/v2/p2p/peer.go +++ b/v2/p2p/peer.go @@ -130,8 +130,12 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) { protocol := msg.Protocol hopCount := msg.HopCount + delay := time.Duration(p.processingLatency * float64(time.Millisecond)) + if remain := delay - time.Since(start); remain > 0 { + time.Sleep(remain) + } + p.mu.Lock() - defer p.mu.Unlock() if _, ok := p.sentTo[content]; !ok { p.sentTo[content] = make(map[PeerID]struct{}) @@ -157,7 +161,11 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) { receivedEdges = append(receivedEdges, senderID) } - targets := msg.Protocol(p.id, msg, allEdges, sentEdges, receivedEdges, msg.Params) + targets, dynamicParams := msg.Protocol(p.id, msg, allEdges, sentEdges, receivedEdges, msg.StaticParams, msg.DynamicParams) + + if dynamicParams == nil { + dynamicParams = make(map[PeerID]map[string]any) + } for _, targetID := range *targets { for _, edge := range p.edges { @@ -168,11 +176,6 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) { } } - delay := time.Duration(p.processingLatency * float64(time.Millisecond)) - if remain := delay - time.Since(start); remain > 0 { - time.Sleep(remain) - } - for _, e := range willSendEdges { edgeCopy := e p.sentTo[content][e.targetID] = struct{}{} @@ -191,23 +194,38 @@ func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) { }) go func(e edge) { - time.Sleep(time.Duration(e.networkLatency) * time.Millisecond) + time.Sleep(time.Duration(e.networkLatency * float64(time.Millisecond))) targetPeer, ok := network.peers[e.targetID] - if !ok || targetPeer == nil { + if !ok || targetPeer == nil || !targetPeer.alive { return } + var dynamics map[string]any + + if _, ok := dynamicParams[e.targetID]; !ok { + dynamics = nil + } else { + dynamics = make(map[string]any) + + for k, v := range dynamicParams[e.targetID] { + dynamics[k] = v + } + } + network.peers[e.targetID].msgQueue <- Message{ - Publisher: msg.Publisher, - From: p.id, - Content: content, - Protocol: protocol, - HopCount: hopCount + 1, - Params: msg.Params, + Publisher: msg.Publisher, + From: p.id, + Content: content, + Protocol: protocol, + HopCount: hopCount + 1, + StaticParams: msg.StaticParams, + DynamicParams: dynamics, } }(edgeCopy) } + + p.mu.Unlock() } // eachStop marks the peer as inactive and closes its message queue. diff --git a/v2/p2p/protocol.go b/v2/p2p/protocol.go index 8ffa8bd..6b7f380 100644 --- a/v2/p2p/protocol.go +++ b/v2/p2p/protocol.go @@ -7,22 +7,23 @@ import ( // Message represents a message sent between nodes in the P2P network. type Message struct { - Publisher PeerID // ID of the peer that originally published the message - From PeerID // ID of the peer that sent the message to the current peer - Content string // the actual content of the message - HopCount int // the number of hops the message has taken from the publisher to the current peer - Protocol ProtocolFunc // the protocol function that determines how the message should be processed and forwarded - Params map[string]any // additional parameters for the protocol function + Publisher PeerID // ID of the peer that originally published the message + From PeerID // ID of the peer that sent the message to the current peer + Content string // the actual content of the message + HopCount int // the number of hops the message has taken from the publisher to the current peer + Protocol ProtocolFunc // the protocol function that determines how the message should be processed and forwarded + StaticParams map[string]any // additional parameters for the protocol function + DynamicParams map[string]any // additional parameters that can change during message processing } // ProtocolFunc defines the function signature for custom protocols in the P2P network. // It takes the current peer's ID, the message being processed, a list of neighbor IDs, -// lists of peers the message has been sent to and received from, and any additional broadcast parameters. -// It returns a pointer to a slice of PeerIDs that the message should be forwarded to. -type ProtocolFunc func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, broadcastParams map[string]any) *[]PeerID +// lists of peers the message has been sent to and received from, and any additional static parameters. +// It returns a pointer to a slice of PeerIDs that the message should be forwarded to, along with any dynamic parameters. +type ProtocolFunc func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, staticParams, dynamicParams map[string]any) (*[]PeerID, map[PeerID]map[string]any) // Flooding is a simple broadcast protocol where each peer forwards the message to all its neighbors except those it has already sent to or received from. -var Flooding ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, broadcastParams map[string]any) *[]PeerID { +var Flooding ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, staticParams, dynamicParams map[string]any) (*[]PeerID, map[PeerID]map[string]any) { targets := make([]PeerID, 0) for _, neighbor := range neighbors { if slices.Contains(sentPeers, neighbor) { @@ -35,11 +36,11 @@ var Flooding ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sen targets = append(targets, neighbor) } - return &targets + return &targets, nil } // Gossip is a broadcast protocol where each peer forwards the message to a random subset of its neighbors, determined by the gossip factor. -var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, broadcastParams map[string]any) *[]PeerID { +var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentPeers []PeerID, receivedPeers []PeerID, staticParams, dynamicParams map[string]any) (*[]PeerID, map[PeerID]map[string]any) { targets := make([]PeerID, 0) for _, neighbor := range neighbors { if slices.Contains(sentPeers, neighbor) { @@ -52,8 +53,8 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP targets = append(targets, neighbor) } - gossipFactor, ok1 := broadcastParams["gossip_factor"].(float64) - gossipNode, ok2 := broadcastParams["gossip_node"].(int) + gossipFactor, ok1 := staticParams["gossip_factor"].(float64) + gossipNode, ok2 := staticParams["gossip_node"].(int) if !ok1 && !ok2 { ok1 = true @@ -70,9 +71,13 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP targets = targets[:k] } else if ok2 { k := gossipNode + if k > len(targets) { + k = len(targets) + } + targets = targets[:k] } } - return &targets + return &targets, nil }