From c724637915c57b8d4c4b476658725f01f9e76f1d Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Wed, 13 May 2026 11:29:16 +0800 Subject: [PATCH 1/6] =?UTF-8?q?feat(examples):=20=E6=B7=BB=E5=8A=A0C10K?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E5=8E=8B=E6=B5=8B=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/c10k_test/Makefile | 8 + examples/c10k_test/bin/conf/zinx.json | 12 ++ examples/c10k_test/client/client.go | 250 ++++++++++++++++++++++++++ examples/c10k_test/server/main.go | 122 +++++++++++++ 4 files changed, 392 insertions(+) create mode 100644 examples/c10k_test/Makefile create mode 100644 examples/c10k_test/bin/conf/zinx.json create mode 100644 examples/c10k_test/client/client.go create mode 100644 examples/c10k_test/server/main.go diff --git a/examples/c10k_test/Makefile b/examples/c10k_test/Makefile new file mode 100644 index 00000000..171f8731 --- /dev/null +++ b/examples/c10k_test/Makefile @@ -0,0 +1,8 @@ +.PHONY: build clean + +build: + go build -o bin/server ./server/ + go build -o bin/client ./client/ + +clean: + rm -rf bin/ diff --git a/examples/c10k_test/bin/conf/zinx.json b/examples/c10k_test/bin/conf/zinx.json new file mode 100644 index 00000000..aa40b03b --- /dev/null +++ b/examples/c10k_test/bin/conf/zinx.json @@ -0,0 +1,12 @@ +{ + "Name": "Zinx-Standard-App", + "Mode": "tcp", + "Host": "0.0.0.0", + "TcpPort": 8888, + "MaxConn": 10000, + "WorkerPoolSize": 10, + "MaxWorkerTaskLen": 1024, + "MaxPacketSize": 4096, + "LogFile": "zinx.log", + "LogLevel": "error" +} \ No newline at end of file diff --git a/examples/c10k_test/client/client.go b/examples/c10k_test/client/client.go new file mode 100644 index 00000000..e348bb20 --- /dev/null +++ b/examples/c10k_test/client/client.go @@ -0,0 +1,250 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "math" + "math/rand" + "net" + "sort" + "strings" + "sync" + "time" +) + +type Packet struct { + A int32 + B int32 + C int32 +} + +type Stats struct { + success int64 + failed int64 + respTimes []float64 + respMu sync.Mutex + histBuckets []int64 + histMu sync.RWMutex +} + +const ( + histBucketCount = 20 +) + +func NewStats() *Stats { + return &Stats{ + respTimes: make([]float64, 0, 100000), + histBuckets: make([]int64, histBucketCount), + } +} + +func (s *Stats) addSuccess(n int64) { + s.respMu.Lock() + s.success += n + s.respMu.Unlock() +} + +func (s *Stats) addFailed(n int64) { + s.respMu.Lock() + s.failed += n + s.respMu.Unlock() +} + +func (s *Stats) addRespTime(ms float64) { + s.respMu.Lock() + s.respTimes = append(s.respTimes, ms) + s.respMu.Unlock() + + bucket := int(ms / 50) + if bucket >= histBucketCount { + bucket = histBucketCount - 1 + } + s.histMu.Lock() + s.histBuckets[bucket]++ + s.histMu.Unlock() +} + +func (s *Stats) getPercentile(p float64) float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + index := int(math.Ceil(float64(len(s.respTimes))*p)) - 1 + if index < 0 { + index = 0 + } + if index >= len(s.respTimes) { + index = len(s.respTimes) - 1 + } + + sorted := make([]float64, len(s.respTimes)) + copy(sorted, s.respTimes) + sort.Float64s(sorted) + return sorted[index] +} + +func (s *Stats) getAvg() float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + var sum float64 + for _, t := range s.respTimes { + sum += t + } + return sum / float64(len(s.respTimes)) +} + +func (s *Stats) printHistogram() { + s.histMu.RLock() + defer s.histMu.RUnlock() + + fmt.Println("\n响应时间分布 (ms):") + fmt.Println(strings.Repeat("-", 50)) + + var total int64 + for _, v := range s.histBuckets { + total += v + } + + maxBucket := int64(0) + for _, v := range s.histBuckets { + if v > maxBucket { + maxBucket = v + } + } + + for i := 0; i < histBucketCount; i++ { + low := i * 50 + high := (i + 1) * 50 + count := s.histBuckets[i] + percentage := float64(count) / float64(total) * 100 + + barLen := 0 + if maxBucket > 0 { + barLen = int(float64(count) / float64(maxBucket) * 40) + } + bar := strings.Repeat("█", barLen) + + fmt.Printf("%4d-%4dms: %8d (%5.2f%%) %s\n", low, high, count, percentage, bar) + } + fmt.Println(strings.Repeat("-", 50)) +} + +func sendMsg(conn net.Conn, msgID uint32, data []byte) error { + header := make([]byte, 8) + binary.BigEndian.PutUint32(header[0:4], msgID) + binary.BigEndian.PutUint32(header[4:8], uint32(len(data))) + _, err := conn.Write(append(header, data...)) + return err +} + +func recvMsg(conn net.Conn) (uint32, []byte, error) { + header := make([]byte, 8) + if _, err := conn.Read(header); err != nil { + return 0, nil, err + } + msgID := binary.BigEndian.Uint32(header[0:4]) + dataLen := binary.BigEndian.Uint32(header[4:8]) + body := make([]byte, dataLen) + if _, err := conn.Read(body); err != nil { + return 0, nil, err + } + return msgID, body, nil +} + +func runClient(clientID int, host string, port int, repeat int, sem chan struct{}, stats *Stats, wg *sync.WaitGroup) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat)) + return + } + defer conn.Close() + + for i := 0; i < repeat; i++ { + pkt := Packet{ + A: int32(rand.Intn(99999) + 1), + B: int32(rand.Intn(99999) + 1), + C: 0, + } + + var buf bytes.Buffer + if err := binary.Write(&buf, binary.BigEndian, pkt); err != nil { + stats.addFailed(1) + continue + } + data := buf.Bytes() + + sendTime := time.Now() + + if err := sendMsg(conn, 1001, data); err != nil { + stats.addFailed(1) + continue + } + + if _, _, err := recvMsg(conn); err != nil { + stats.addFailed(1) + } else { + respTime := time.Since(sendTime).Seconds() * 1000 + stats.addSuccess(1) + stats.addRespTime(respTime) + } + + //time.Sleep(time.Millisecond) + } +} + +func main() { + serverHost := "localhost" + serverPort := 8888 + totalConns := 10000 + repeatPerConn := 1000 + + fmt.Printf("🚀 启动压测: %d 并发连接, 每个连接请求 %d 次\n", totalConns, repeatPerConn) + startTime := time.Now() + + stats := NewStats() + sem := make(chan struct{}, 10000) + var wg sync.WaitGroup + + wg.Add(totalConns) + for i := 0; i < totalConns; i++ { + go runClient(i, serverHost, serverPort, repeatPerConn, sem, stats, &wg) + } + + wg.Wait() + duration := time.Since(startTime) + + qps := float64(stats.success) / duration.Seconds() + + fmt.Println("\n" + strings.Repeat("=", 50)) + fmt.Printf("🏁 压测报告\n") + fmt.Printf("总耗时: %.2f 秒\n", duration.Seconds()) + fmt.Printf("成功次数: %d\n", stats.success) + fmt.Printf("失败次数: %d\n", stats.failed) + fmt.Printf("有效 QPS: %.2f\n", qps) + fmt.Println(strings.Repeat("-", 50)) + fmt.Printf("响应时间 (ms):\n") + fmt.Printf(" 平均: %.2f\n", stats.getAvg()) + fmt.Printf(" 最小: %.2f\n", stats.getPercentile(0)) + fmt.Printf(" P50: %.2f\n", stats.getPercentile(0.50)) + fmt.Printf(" P90: %.2f\n", stats.getPercentile(0.90)) + fmt.Printf(" P95: %.2f\n", stats.getPercentile(0.95)) + fmt.Printf(" P99: %.2f\n", stats.getPercentile(0.99)) + fmt.Printf(" 最大: %.2f\n", stats.getPercentile(1.0)) + fmt.Println(strings.Repeat("=", 50)) + + stats.printHistogram() +} diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go new file mode 100644 index 00000000..b2584a4c --- /dev/null +++ b/examples/c10k_test/server/main.go @@ -0,0 +1,122 @@ +package main + +import ( + "encoding/binary" + "fmt" + "runtime" + "sync" + "time" + + "github.com/aceld/zinx/zconf" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" +) + +// Packet 数据结构 +type Packet struct { + A int32 + B int32 + C int32 +} + +// Encode 将 Packet 编码为字节数组 +func (p *Packet) Encode() []byte { + buf := make([]byte, 12) // 3 * 4 bytes + binary.BigEndian.PutUint32(buf[0:4], uint32(p.A)) + binary.BigEndian.PutUint32(buf[4:8], uint32(p.B)) + binary.BigEndian.PutUint32(buf[8:12], uint32(p.C)) + return buf +} + +// Decode 将字节数组解码为 Packet +func Decode(buf []byte) *Packet { + if len(buf) < 12 { + return nil + } + return &Packet{ + A: int32(binary.BigEndian.Uint32(buf[0:4])), + B: int32(binary.BigEndian.Uint32(buf[4:8])), + C: int32(binary.BigEndian.Uint32(buf[8:12])), + } +} + +var ( + connCount int32 + mutex sync.Mutex +) + +func OnConnStart(conn ziface.IConnection) { + mutex.Lock() + connCount++ + mutex.Unlock() + //logger.Info("Client connected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) +} + +func OnConnStop(conn ziface.IConnection) { + mutex.Lock() + connCount-- + mutex.Unlock() + //logger.Info("Client disconnected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) +} + +// CalculateRouter 计算路由 +type CalculateRouter struct{} + +func (c *CalculateRouter) PreHandle(request ziface.IRequest) { +} + +func (c *CalculateRouter) Handle(request ziface.IRequest) { + // 获取消息数据 + data := request.GetData() + conn := request.GetConnection() + + // 解码 Packet + packet := Decode(data) + if packet == nil { + zlog.Ins().ErrorF("Invalid packet data from %s", conn.RemoteAddr().String()) + return + } + + // 计算 C = A + B + packet.C = packet.A + packet.B + + zlog.Ins().DebugF("Received from %s: A=%d, B=%d, Calculated C=%d", + conn.RemoteAddr().String(), packet.A, packet.B, packet.C) + + // 使用 SendMsg 回复(异步方式) + err := conn.SendMsg(1, packet.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendMsg error: %v", err) + } +} + +func (c *CalculateRouter) PostHandle(request ziface.IRequest) { +} + +func main() { + + // 创建服务器 + s := znet.NewServer() + + s.SetOnConnStart(OnConnStart) + s.SetOnConnStop(OnConnStop) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + mutex.Lock() + count := connCount + mutex.Unlock() + fmt.Println("Connection stats", "clients", count, "goroutines", runtime.NumGoroutine()) + } + }() + + // 注册路由 + s.AddRouter(1001, &CalculateRouter{}) + + fmt.Printf("C10K Test Server starting on :%d\n", zconf.GlobalObject.TCPPort) + + // 启动服务 + s.Serve() +} From 043db5591968b6a218a7b7b3a55eaac4e9fba223 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Wed, 13 May 2026 12:41:38 +0800 Subject: [PATCH 2/6] =?UTF-8?q?perf(c10k=5Ftest):=20=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E8=B0=83=E8=AF=95=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/c10k_test/server/main.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go index b2584a4c..8d8745ae 100644 --- a/examples/c10k_test/server/main.go +++ b/examples/c10k_test/server/main.go @@ -80,10 +80,6 @@ func (c *CalculateRouter) Handle(request ziface.IRequest) { // 计算 C = A + B packet.C = packet.A + packet.B - - zlog.Ins().DebugF("Received from %s: A=%d, B=%d, Calculated C=%d", - conn.RemoteAddr().String(), packet.A, packet.B, packet.C) - // 使用 SendMsg 回复(异步方式) err := conn.SendMsg(1, packet.Encode()) if err != nil { From 6f01931be3cfa28f6c5c6b2fe2d930025d4851d8 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Wed, 13 May 2026 12:43:24 +0800 Subject: [PATCH 3/6] =?UTF-8?q?perf(c10k):=20=E4=BC=98=E5=8C=96=E5=8C=85?= =?UTF-8?q?=E5=BA=8F=E5=88=97=E5=8C=96=E5=B9=B6=E5=90=AF=E7=94=A8=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E9=97=B4=E9=9A=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/c10k_test/client/client.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/examples/c10k_test/client/client.go b/examples/c10k_test/client/client.go index e348bb20..26e5f496 100644 --- a/examples/c10k_test/client/client.go +++ b/examples/c10k_test/client/client.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "encoding/binary" "fmt" "math" @@ -174,22 +173,18 @@ func runClient(clientID int, host string, port int, repeat int, sem chan struct{ defer conn.Close() for i := 0; i < repeat; i++ { - pkt := Packet{ - A: int32(rand.Intn(99999) + 1), - B: int32(rand.Intn(99999) + 1), - C: 0, - } + a := int32(rand.Intn(99999) + 1) + b := int32(rand.Intn(99999) + 1) + c := int32(0) - var buf bytes.Buffer - if err := binary.Write(&buf, binary.BigEndian, pkt); err != nil { - stats.addFailed(1) - continue - } - data := buf.Bytes() + var data [12]byte + binary.BigEndian.PutUint32(data[0:4], uint32(a)) + binary.BigEndian.PutUint32(data[4:8], uint32(b)) + binary.BigEndian.PutUint32(data[8:12], uint32(c)) sendTime := time.Now() - if err := sendMsg(conn, 1001, data); err != nil { + if err := sendMsg(conn, 1001, data[:]); err != nil { stats.addFailed(1) continue } @@ -202,7 +197,7 @@ func runClient(clientID int, host string, port int, repeat int, sem chan struct{ stats.addRespTime(respTime) } - //time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond) } } From d9ec180b987f04019aaf0cc1577eb11c3a3c39f0 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Wed, 13 May 2026 22:48:22 +0800 Subject: [PATCH 4/6] =?UTF-8?q?perf(znet):=20=E4=BC=98=E5=8C=96=20StartWri?= =?UTF-8?q?ter=20=E6=89=B9=E9=87=8F=E5=8F=91=E9=80=81=E4=B8=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E9=9D=9E=E9=98=BB=E5=A1=9E=E5=86=99=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/c10k_test/bin/conf/zinx.json | 3 +- examples/c10k_test/client/client.go | 11 ++++-- examples/c10k_test/server/main.go | 10 ++++-- znet/connection.go | 49 +++++++++++++++++---------- 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/examples/c10k_test/bin/conf/zinx.json b/examples/c10k_test/bin/conf/zinx.json index aa40b03b..8e0e09b4 100644 --- a/examples/c10k_test/bin/conf/zinx.json +++ b/examples/c10k_test/bin/conf/zinx.json @@ -8,5 +8,6 @@ "MaxWorkerTaskLen": 1024, "MaxPacketSize": 4096, "LogFile": "zinx.log", + "LogIsolationLevel": 3, "LogLevel": "error" -} \ No newline at end of file +} diff --git a/examples/c10k_test/client/client.go b/examples/c10k_test/client/client.go index 26e5f496..3274bc87 100644 --- a/examples/c10k_test/client/client.go +++ b/examples/c10k_test/client/client.go @@ -184,13 +184,18 @@ func runClient(clientID int, host string, port int, repeat int, sem chan struct{ sendTime := time.Now() + conn.SetWriteDeadline(time.Now().Add(15 * time.Second)) if err := sendMsg(conn, 1001, data[:]); err != nil { - stats.addFailed(1) - continue + fmt.Printf("[client %d] send failed at i=%d: %v\n", clientID, i, err) + stats.addFailed(int64(repeat - i)) + break } + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) if _, _, err := recvMsg(conn); err != nil { - stats.addFailed(1) + fmt.Printf("[client %d] recv failed at i=%d: %v\n", clientID, i, err) + stats.addFailed(int64(repeat - i)) + break } else { respTime := time.Since(sendTime).Seconds() * 1000 stats.addSuccess(1) diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go index 8d8745ae..0ce2a824 100644 --- a/examples/c10k_test/server/main.go +++ b/examples/c10k_test/server/main.go @@ -80,10 +80,14 @@ func (c *CalculateRouter) Handle(request ziface.IRequest) { // 计算 C = A + B packet.C = packet.A + packet.B - // 使用 SendMsg 回复(异步方式) - err := conn.SendMsg(1, packet.Encode()) + // 使用 SendBuffMsg 回复(异步方式) + err := conn.SendBuffMsg(1, packet.Encode()) if err != nil { - zlog.Ins().ErrorF("SendMsg error: %v", err) + zlog.Ins().ErrorF("SendBuffMsg error (first): %v, retrying...", err) + // err = conn.SendBuffMsg(1, packet.Encode()) + // if err != nil { + // zlog.Ins().ErrorF("SendBuffMsg error (retry): %v", err) + // } } } diff --git a/znet/connection.go b/znet/connection.go index 190f6661..b4a1e9d0 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -196,28 +196,42 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection { // (写消息Goroutine, 用户将数据发送给客户端) func (c *Connection) StartWriter() { zlog.Ins().InfoF("Writer Goroutine is running") - ticker := time.NewTicker(10 * time.Millisecond) defer func() { zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String()) - ticker.Stop() c.Flush() }() for { select { - case <-ticker.C: - err := c.Flush() - if err != nil { - zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err) + case data, ok := <-c.msgBuffChan: + if !ok { + zlog.Ins().ErrorF("msgBuffChan is Closed") return } - case data, ok := <-c.msgBuffChan: - if ok { - if err := c.SendBuf(data); err != nil { - zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) - return + + if err := c.SendBuf(data); err != nil { + zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) + return + } + // 一次性循环读出 msgBuffChan 中所有剩余数据 + drainLoop: + for { + select { + case extra, ok2 := <-c.msgBuffChan: + if !ok2 { + zlog.Ins().ErrorF("msgBuffChan is Closed") + return + } + if err := c.SendBuf(extra); err != nil { + zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) + return + } + default: + break drainLoop } - } else { - zlog.Ins().ErrorF("msgBuffChan is Closed") + } + // 批量写入完成后一次性 flush + if err := c.Flush(); err != nil { + zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err) return } case <-c.ctx.Done(): @@ -252,7 +266,7 @@ func (c *Connection) StartReader() { // (从conn的IO中读取数据到内存缓冲buffer中) n, err := c.conn.Read(buffer) if err != nil { - zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) + zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) // 不需要发 Log,正常关闭 return } zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n])) @@ -417,8 +431,6 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro o(&opt) } - idleTimeout := time.NewTimer(opt.Timeout) - defer idleTimeout.Stop() if c.isClosed() == true { return errors.New("Connection closed when send buff msg") @@ -435,10 +447,11 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro // Close all channels associated with the connection close(c.msgBuffChan) return errors.New("connection closed when send buff msg") - case <-idleTimeout.C: - return errors.New("send buff msg timeout") case c.msgBuffChan <- data: return nil + default: + zlog.Ins().ErrorF("send buff msg channel is full") + return errors.New("send buff msg channel is full") } } From 53b4bbfdfeecad10f1313ed1e49b9419c7611267 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Thu, 14 May 2026 07:22:45 +0800 Subject: [PATCH 5/6] =?UTF-8?q?fix(znet):=20=E7=A7=BB=E9=99=A4SendToQueue?= =?UTF-8?q?=E4=B8=AD=E6=9C=AA=E4=BD=BF=E7=94=A8=E7=9A=84=E9=80=89=E9=A1=B9?= =?UTF-8?q?=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- znet/connection.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/znet/connection.go b/znet/connection.go index b4a1e9d0..08d8b37b 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -423,15 +423,6 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro go c.StartWriter() } - opt := ziface.MsgSendOptionObj{ - Timeout: 5 * time.Millisecond, - } - - for _, o := range opts { - o(&opt) - } - - if c.isClosed() == true { return errors.New("Connection closed when send buff msg") } From 68789e500649f84067d938b9a756114f216783a1 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Sat, 16 May 2026 01:03:31 +0800 Subject: [PATCH 6/6] =?UTF-8?q?perf(znet):=20=E4=BC=98=E5=8C=96=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E5=85=B3=E9=97=AD=E9=80=BB=E8=BE=91=EF=BC=8C=E5=9C=A8?= =?UTF-8?q?reader=E5=8D=8F=E7=A8=8B=E4=B8=AD=E7=BB=9F=E4=B8=80=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E6=B8=85=E7=90=86=E6=93=8D=E4=BD=9C=EF=BC=8C=E6=AF=8F?= =?UTF-8?q?=E4=B8=AA=E8=BF=9E=E6=8E=A5=E8=8A=82=E7=9C=81=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E5=8D=8F=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 优化连接关闭逻辑,在reader协程中统一执行清理操作 2. 新增client_v2压测客户端,支持多尺寸数据包、随机发包策略与重连模拟 3. 扩展c10k测试服务端,添加小/中/大数据包路由处理 4. 添加client_v2二进制编译支持与gitignore规则 --- .gitignore | 4 + examples/c10k_test/Makefile | 1 + examples/c10k_test/client_v2/client.go | 372 +++++++++++++++++++++++++ examples/c10k_test/server/main.go | 153 +++++++++- znet/connection.go | 21 +- 5 files changed, 528 insertions(+), 23 deletions(-) create mode 100644 examples/c10k_test/client_v2/client.go diff --git a/.gitignore b/.gitignore index 0f80e41c..8ce409d2 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,7 @@ log /examples/zinx_websocket/minicode/miniprogram_npm rebase +examples/c10k_test/bin/server +examples/c10k_test/bin/client + +examples/c10k_test/bin/client_v2 diff --git a/examples/c10k_test/Makefile b/examples/c10k_test/Makefile index 171f8731..afc6462b 100644 --- a/examples/c10k_test/Makefile +++ b/examples/c10k_test/Makefile @@ -3,6 +3,7 @@ build: go build -o bin/server ./server/ go build -o bin/client ./client/ + go build -o bin/client_v2 ./client_v2/ clean: rm -rf bin/ diff --git a/examples/c10k_test/client_v2/client.go b/examples/c10k_test/client_v2/client.go new file mode 100644 index 00000000..2eca6296 --- /dev/null +++ b/examples/c10k_test/client_v2/client.go @@ -0,0 +1,372 @@ +package main + +import ( + "encoding/binary" + "fmt" + "math" + "math/rand" + "net" + "sort" + "strings" + "sync" + "time" +) + +const ( + CmdCalculate = 1001 + CmdSmall = 2001 + CmdMedium = 3001 + CmdLarge = 4001 +) + +type Stats struct { + success int64 + failed int64 + respTimes []float64 + respMu sync.Mutex + histBuckets []int64 + histMu sync.RWMutex +} + +const ( + histBucketCount = 20 +) + +func NewStats() *Stats { + return &Stats{ + respTimes: make([]float64, 0, 100000), + histBuckets: make([]int64, histBucketCount), + } +} + +func (s *Stats) addSuccess(n int64) { + s.respMu.Lock() + s.success += n + s.respMu.Unlock() +} + +func (s *Stats) addFailed(n int64) { + s.respMu.Lock() + s.failed += n + s.respMu.Unlock() +} + +func (s *Stats) addRespTime(ms float64) { + s.respMu.Lock() + s.respTimes = append(s.respTimes, ms) + s.respMu.Unlock() + + bucket := int(ms / 50) + if bucket >= histBucketCount { + bucket = histBucketCount - 1 + } + s.histMu.Lock() + s.histBuckets[bucket]++ + s.histMu.Unlock() +} + +func (s *Stats) getPercentile(p float64) float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + index := int(math.Ceil(float64(len(s.respTimes))*p)) - 1 + if index < 0 { + index = 0 + } + if index >= len(s.respTimes) { + index = len(s.respTimes) - 1 + } + + sorted := make([]float64, len(s.respTimes)) + copy(sorted, s.respTimes) + sort.Float64s(sorted) + return sorted[index] +} + +func (s *Stats) getAvg() float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + var sum float64 + for _, t := range s.respTimes { + sum += t + } + return sum / float64(len(s.respTimes)) +} + +func (s *Stats) printHistogram() { + s.histMu.RLock() + defer s.histMu.RUnlock() + + fmt.Println("\n响应时间分布 (ms):") + fmt.Println(strings.Repeat("-", 50)) + + var total int64 + for _, v := range s.histBuckets { + total += v + } + + maxBucket := int64(0) + for _, v := range s.histBuckets { + if v > maxBucket { + maxBucket = v + } + } + + for i := 0; i < histBucketCount; i++ { + low := i * 50 + high := (i + 1) * 50 + count := s.histBuckets[i] + percentage := float64(count) / float64(total) * 100 + + barLen := 0 + if maxBucket > 0 { + barLen = int(float64(count) / float64(maxBucket) * 40) + } + bar := strings.Repeat("█", barLen) + + fmt.Printf("%4d-%4dms: %8d (%5.2f%%) %s\n", low, high, count, percentage, bar) + } + fmt.Println(strings.Repeat("-", 50)) +} + +func sendMsg(conn net.Conn, msgID uint32, data []byte) error { + header := make([]byte, 8) + binary.BigEndian.PutUint32(header[0:4], msgID) + binary.BigEndian.PutUint32(header[4:8], uint32(len(data))) + _, err := conn.Write(append(header, data...)) + return err +} + +func recvMsg(conn net.Conn) (uint32, []byte, error) { + header := make([]byte, 8) + if _, err := conn.Read(header); err != nil { + return 0, nil, err + } + msgID := binary.BigEndian.Uint32(header[0:4]) + dataLen := binary.BigEndian.Uint32(header[4:8]) + body := make([]byte, dataLen) + if _, err := conn.Read(body); err != nil { + return 0, nil, err + } + return msgID, body, nil +} + +func genCalculatePacket() []byte { + a := int32(rand.Intn(99999) + 1) + b := int32(rand.Intn(99999) + 1) + c := int32(0) + var data [12]byte + binary.BigEndian.PutUint32(data[0:4], uint32(a)) + binary.BigEndian.PutUint32(data[4:8], uint32(b)) + binary.BigEndian.PutUint32(data[8:12], uint32(c)) + return data[:] +} + +func genSmallPacket() []byte { + data := make([]byte, 16) + binary.BigEndian.PutUint32(data[0:4], uint32(rand.Intn(10000))) + for i := 4; i < 16; i++ { + data[i] = byte(rand.Intn(256)) + } + return data +} + +func genMediumPacket() []byte { + data := make([]byte, 64) + binary.BigEndian.PutUint32(data[0:4], uint32(rand.Intn(10000))) + binary.BigEndian.PutUint32(data[4:8], uint32(rand.Intn(100000))) + for i := 8; i < 64; i++ { + data[i] = byte(rand.Intn(256)) + } + return data +} + +func genLargePacket() []byte { + data := make([]byte, 256) + binary.BigEndian.PutUint32(data[0:4], uint32(rand.Intn(10000))) + binary.BigEndian.PutUint32(data[4:8], uint32(rand.Intn(100000))) + binary.BigEndian.PutUint32(data[8:12], uint32(rand.Intn(1000000))) + for i := 12; i < 256; i++ { + data[i] = byte(rand.Intn(256)) + } + return data +} + +func getRandomCmd() uint32 { + switch rand.Intn(4) { + case 0: + return CmdCalculate + case 1: + return CmdSmall + case 2: + return CmdMedium + case 3: + return CmdLarge + default: + return CmdCalculate + } +} + +func getPacketByCmd(cmd uint32) []byte { + switch cmd { + case CmdCalculate: + return genCalculatePacket() + case CmdSmall: + return genSmallPacket() + case CmdMedium: + return genMediumPacket() + case CmdLarge: + return genLargePacket() + default: + return genCalculatePacket() + } +} + +func runClient(clientID int, host string, port int, repeat int, sem chan struct{}, stats *Stats, wg *sync.WaitGroup) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat)) + return + } + + reconnectCount := 0 + + for i := 0; i < repeat; { + // 随机断开重连 (5% 概率) + if i > 0 && rand.Intn(20) == 0 { + conn.Close() + reconnectCount++ + // 模拟重连延迟 (10-100ms) + time.Sleep(time.Duration(rand.Intn(90)+10) * time.Millisecond) + + conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat - i)) + return + } + } + + // 随机决定是单发还是多发 + sendCount := 1 + if rand.Intn(10) < 3 { // 30% 概率发两个包 + sendCount = 2 + } + if i+sendCount > repeat { + sendCount = repeat - i + } + + sendTime := time.Now() + + // 发送多个包 + successSend := 0 + for j := 0; j < sendCount; j++ { + cmd := getRandomCmd() + data := getPacketByCmd(cmd) + conn.SetWriteDeadline(time.Now().Add(15 * time.Second)) + if err := sendMsg(conn, cmd, data); err != nil { + conn.Close() + reconnectCount++ + // 尝试重连 + time.Sleep(time.Duration(rand.Intn(100)+50) * time.Millisecond) + conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat - i - j)) + return + } + continue + } + successSend++ + } + + // 接收对应数量的回包 + successRecv := 0 + for j := 0; j < sendCount; j++ { + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + if _, _, err := recvMsg(conn); err != nil { + conn.Close() + reconnectCount++ + time.Sleep(time.Duration(rand.Intn(100)+50) * time.Millisecond) + conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat - i - j)) + return + } + j-- + continue + } + successRecv++ + } + + respTime := time.Since(sendTime).Seconds() * 1000 + stats.addSuccess(int64(successRecv)) + if successRecv > 0 { + stats.addRespTime(respTime / float64(successRecv)) + } + + i += successSend + + time.Sleep(time.Millisecond) + } + + conn.Close() +} + +func main() { + serverHost := "localhost" + serverPort := 8888 + totalConns := 10000 + repeatPerConn := 1000 + + fmt.Printf("🚀 启动压测 v2: %d 并发连接, 每个连接请求 %d 次\n", totalConns, repeatPerConn) + fmt.Printf(" - 命令号: 1001(12字节), 2001(16字节), 3001(64字节), 4001(256字节)\n") + fmt.Printf(" - 模式: 随机单发或多发(30%%概率发2包)\n") + fmt.Printf(" - 网络模拟: 随机断开重连(5%%概率), 重连延迟10-150ms\n") + startTime := time.Now() + + stats := NewStats() + sem := make(chan struct{}, 10000) + var wg sync.WaitGroup + + wg.Add(totalConns) + for i := 0; i < totalConns; i++ { + go runClient(i, serverHost, serverPort, repeatPerConn, sem, stats, &wg) + } + + wg.Wait() + duration := time.Since(startTime) + + qps := float64(stats.success) / duration.Seconds() + + fmt.Println("\n" + strings.Repeat("=", 50)) + fmt.Printf("🏁 压测报告 v2\n") + fmt.Printf("总耗时: %.2f 秒\n", duration.Seconds()) + fmt.Printf("成功次数: %d\n", stats.success) + fmt.Printf("失败次数: %d\n", stats.failed) + fmt.Printf("有效 QPS: %.2f\n", qps) + fmt.Println(strings.Repeat("-", 50)) + fmt.Printf("响应时间 (ms):\n") + fmt.Printf(" 平均: %.2f\n", stats.getAvg()) + fmt.Printf(" 最小: %.2f\n", stats.getPercentile(0)) + fmt.Printf(" P50: %.2f\n", stats.getPercentile(0.50)) + fmt.Printf(" P90: %.2f\n", stats.getPercentile(0.90)) + fmt.Printf(" P95: %.2f\n", stats.getPercentile(0.95)) + fmt.Printf(" P99: %.2f\n", stats.getPercentile(0.99)) + fmt.Printf(" 最大: %.2f\n", stats.getPercentile(1.0)) + fmt.Println(strings.Repeat("=", 50)) + + stats.printHistogram() +} diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go index 0ce2a824..0d62302b 100644 --- a/examples/c10k_test/server/main.go +++ b/examples/c10k_test/server/main.go @@ -41,6 +41,51 @@ func Decode(buf []byte) *Packet { } } +// SmallPacket 小数据包 (16字节) +type SmallPacket struct { + ID int32 + Data [12]byte +} + +func (p *SmallPacket) Encode() []byte { + buf := make([]byte, 16) + binary.BigEndian.PutUint32(buf[0:4], uint32(p.ID)) + copy(buf[4:16], p.Data[:]) + return buf +} + +// MediumPacket 中等数据包 (64字节) +type MediumPacket struct { + ID int32 + Seq int32 + Data [56]byte +} + +func (p *MediumPacket) Encode() []byte { + buf := make([]byte, 64) + binary.BigEndian.PutUint32(buf[0:4], uint32(p.ID)) + binary.BigEndian.PutUint32(buf[4:8], uint32(p.Seq)) + copy(buf[8:64], p.Data[:]) + return buf +} + +// LargePacket 大数据包 (256字节) +type LargePacket struct { + ID int32 + Seq int32 + Counter int32 + Data [244]byte +} + +func (p *LargePacket) Encode() []byte { + buf := make([]byte, 256) + binary.BigEndian.PutUint32(buf[0:4], uint32(p.ID)) + binary.BigEndian.PutUint32(buf[4:8], uint32(p.Seq)) + binary.BigEndian.PutUint32(buf[8:12], uint32(p.Counter)) + copy(buf[12:256], p.Data[:]) + return buf +} + var ( connCount int32 mutex sync.Mutex @@ -60,40 +105,114 @@ func OnConnStop(conn ziface.IConnection) { //logger.Info("Client disconnected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) } -// CalculateRouter 计算路由 +// CalculateRouter 计算路由 (命令号 1001) type CalculateRouter struct{} func (c *CalculateRouter) PreHandle(request ziface.IRequest) { } func (c *CalculateRouter) Handle(request ziface.IRequest) { - // 获取消息数据 data := request.GetData() conn := request.GetConnection() - - // 解码 Packet packet := Decode(data) if packet == nil { zlog.Ins().ErrorF("Invalid packet data from %s", conn.RemoteAddr().String()) return } - - // 计算 C = A + B packet.C = packet.A + packet.B - // 使用 SendBuffMsg 回复(异步方式) - err := conn.SendBuffMsg(1, packet.Encode()) + err := conn.SendBuffMsg(1001, packet.Encode()) if err != nil { - zlog.Ins().ErrorF("SendBuffMsg error (first): %v, retrying...", err) - // err = conn.SendBuffMsg(1, packet.Encode()) - // if err != nil { - // zlog.Ins().ErrorF("SendBuffMsg error (retry): %v", err) - // } + zlog.Ins().ErrorF("SendBuffMsg error: %v", err) } } func (c *CalculateRouter) PostHandle(request ziface.IRequest) { } +// SmallRouter 小数据路由 (命令号 2001) +type SmallRouter struct{} + +func (s *SmallRouter) PreHandle(request ziface.IRequest) { +} + +func (s *SmallRouter) Handle(request ziface.IRequest) { + data := request.GetData() + conn := request.GetConnection() + if len(data) < 16 { + zlog.Ins().ErrorF("Invalid small packet from %s", conn.RemoteAddr().String()) + return + } + pkt := &SmallPacket{ + ID: int32(binary.BigEndian.Uint32(data[0:4])), + } + copy(pkt.Data[:], data[4:16]) + pkt.ID++ + err := conn.SendBuffMsg(2001, pkt.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendBuffMsg error: %v", err) + } +} + +func (s *SmallRouter) PostHandle(request ziface.IRequest) { +} + +// MediumRouter 中等数据路由 (命令号 3001) +type MediumRouter struct{} + +func (m *MediumRouter) PreHandle(request ziface.IRequest) { +} + +func (m *MediumRouter) Handle(request ziface.IRequest) { + data := request.GetData() + conn := request.GetConnection() + if len(data) < 64 { + zlog.Ins().ErrorF("Invalid medium packet from %s", conn.RemoteAddr().String()) + return + } + pkt := &MediumPacket{ + ID: int32(binary.BigEndian.Uint32(data[0:4])), + Seq: int32(binary.BigEndian.Uint32(data[4:8])), + } + copy(pkt.Data[:], data[8:64]) + pkt.Seq++ + err := conn.SendBuffMsg(3001, pkt.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendBuffMsg error: %v", err) + } +} + +func (m *MediumRouter) PostHandle(request ziface.IRequest) { +} + +// LargeRouter 大数据路由 (命令号 4001) +type LargeRouter struct{} + +func (l *LargeRouter) PreHandle(request ziface.IRequest) { +} + +func (l *LargeRouter) Handle(request ziface.IRequest) { + data := request.GetData() + conn := request.GetConnection() + if len(data) < 256 { + zlog.Ins().ErrorF("Invalid large packet from %s", conn.RemoteAddr().String()) + return + } + pkt := &LargePacket{ + ID: int32(binary.BigEndian.Uint32(data[0:4])), + Seq: int32(binary.BigEndian.Uint32(data[4:8])), + Counter: int32(binary.BigEndian.Uint32(data[8:12])), + } + copy(pkt.Data[:], data[12:256]) + pkt.Counter++ + err := conn.SendBuffMsg(4001, pkt.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendBuffMsg error: %v", err) + } +} + +func (l *LargeRouter) PostHandle(request ziface.IRequest) { +} + func main() { // 创建服务器 @@ -108,12 +227,18 @@ func main() { mutex.Lock() count := connCount mutex.Unlock() - fmt.Println("Connection stats", "clients", count, "goroutines", runtime.NumGoroutine()) + + var m runtime.MemStats + runtime.ReadMemStats(&m) + fmt.Println("Connection stats", "time", time.Now().Format("2006-01-02 15:04:05"), "clients", count, "goroutines", runtime.NumGoroutine(), "MEM:", m.Alloc/1024/1024, "MB") } }() // 注册路由 s.AddRouter(1001, &CalculateRouter{}) + s.AddRouter(2001, &SmallRouter{}) + s.AddRouter(3001, &MediumRouter{}) + s.AddRouter(4001, &LargeRouter{}) fmt.Printf("C10K Test Server starting on :%d\n", zconf.GlobalObject.TCPPort) diff --git a/znet/connection.go b/znet/connection.go index 08d8b37b..4d675736 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -245,7 +245,10 @@ func (c *Connection) StartWriter() { func (c *Connection) StartReader() { zlog.Ins().InfoF("[Reader Goroutine is running]") defer zlog.Ins().InfoF("%s [conn Reader exit!]", c.RemoteAddr().String()) - defer c.Stop() + defer func() { + c.Stop() + c.doClose() + }() defer func() { if err := recover(); err != nil { zlog.Ins().ErrorF("connID=%d, panic err=%v", c.GetConnID(), err) @@ -266,7 +269,7 @@ func (c *Connection) StartReader() { // (从conn的IO中读取数据到内存缓冲buffer中) n, err := c.conn.Read(buffer) if err != nil { - zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) // 不需要发 Log,正常关闭 + zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) // 不需要发 Log,正常关闭 return } zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n])) @@ -332,14 +335,8 @@ func (c *Connection) Start() { // (开启用户从客户端读取数据流程的Goroutine) go c.StartReader() - select { - case <-c.ctx.Done(): - c.finalizer() + // 直接退出,让 StartReader协程 来处理关闭逻辑 - // 归还workerid - freeWorker(c) - return - } } // Stop stops the connection and ends the current connection state. @@ -621,3 +618,9 @@ func (s *Connection) InvokeCloseCallbacks() { defer s.closeCallbackMutex.RUnlock() s.closeCallback.Invoke() } + +func (s *Connection) doClose() { + s.finalizer() + // 归还workerid + freeWorker(s) +}