go的grpc的三种流模式通信
- 1、grpc通信模式简介
- 2、stream.proto文件
- 3、服务端代码 server.go
- 4、客户端代码client.go
- 5、测试说明
1、grpc通信模式简介
grpc的数据传输可以分为4种模式:
简单模式 (一元调用)
服务端流模式 (服务端返回实时股票数据给前台)
客户端流模式 (物联网硬件设备向后端发送数据)
双向流模式 (聊天场景)
2、stream.proto文件
syntax = "proto3";
option go_package = "./;proto";
// grpc的数据传输可以分为4种模式:
// 简单模式(一元调用)、服务端流模式(服务端返回实时股票数据给前台)、客户端流模式(物联网硬件设备向后端发送数据)、双向流模式(聊天场景)
service Greeter {
rpc GetStream(StreamReqData) returns (stream StreamResData);// 服务端流模式
rpc PutStream(stream StreamReqData) returns (StreamResData);// 客户端流模式
rpc AllStream(stream StreamReqData) returns (stream StreamResData);// 双向流模式
}
// 请求数据结构体
message StreamReqData{
string data = 1;
}
// 响应数据结构体
message StreamResData{
string data = 1;
}
生成客户端代理stub程序、服务端代理stub程序、接口相关代码的命令:
protoc --go_out=. --go-grpc_out=. stream.proto
3、服务端代码 server.go
package main
import (
"Go_Bible/stream_grpc_test/proto"
"fmt"
"google.golang.org/grpc"
"net"
"sync"
"time"
)
// 端口
const PORT = ":8088"
// 自定义服务结构体
type MyServer struct {
proto.UnimplementedGreeterServer
}
// 实现服务端流模式方法
func (s *MyServer) GetStream(req *proto.StreamReqData, srvStr proto.Greeter_GetStreamServer) error {
i := 0
for {
i++
// 向客户端发送响应结构体
_ = srvStr.SendMsg(&proto.StreamResData{
Data:fmt.Sprintf("%v", time.Now().Unix()),
})
time.Sleep(time.Second)
// 每隔一秒发送1次,总共发送10次
if i >= 10 {
break
}
}
return nil
}
// 实现客户端流模式方法
func (s *MyServer) PutStream(cliStr proto.Greeter_PutStreamServer) error {
for {
if data, err := cliStr.Recv(); err != nil {
fmt.Println("接受客户端的流数据失败:" + err.Error())
break
}else {
fmt.Println("接受到客户端的流数据成功:" + data.Data)
}
}
return nil
}
// 实现双向流模式方法
func (s *MyServer) AllStream(allStr proto.Greeter_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
i := 0
for {
i++
// 向客户端发送响应结构体
_ = allStr.SendMsg(&proto.StreamResData{Data: fmt.Sprintf("我是服务器 %d", i)})
time.Sleep(time.Second)
if i >= 10 {
break
}
}
}()
go func() {
defer wg.Done()
for {
data, _ := allStr.Recv()
fmt.Println("服务端接受到客户端的流数据成功:" + data.Data)
}
}()
// 等待相关协程开启调用完成
wg.Wait()
return nil
}
func main() {
// 1、监听端口
listener, err := net.Listen("tcp", PORT)
if err != nil {
panic("监听端口失败:" + err.Error())
}
// 2、创建服务
server := grpc.NewServer()
// 3、注册服务
proto.RegisterGreeterServer(server, &MyServer{})
// 4、启动服务
err = server.Serve(listener)
if err != nil {
panic("启动服务失败:" + err.Error())
}
}
4、客户端代码client.go
package main
import (
"Go_Bible/stream_grpc_test/proto"
"context"
"fmt"
"google.golang.org/grpc"
"strconv"
"sync"
"time"
)
// testGetStream 测试服务端流模式
func testGetStream(client proto.GreeterClient){
res, err := client.GetStream(context.Background(), &proto.StreamReqData{Data: "我是客户端"})
if err != nil {
panic("服务端流模式,从服务端获取数据失败:" + err.Error())
}
for {
data, err := res.Recv()
if err != nil {
fmt.Println("从服务端获取数据失败:" + err.Error())
break
}
fmt.Println("从服务端获取到数据成功:" + data.Data)
}
}
// testPutStream 测试客户端流模式
func testPutStream(client proto.GreeterClient){
putStrClient, err := client.PutStream(context.Background())
if err != nil {
panic("客户端流模式,向服务端发送数据失败:" + err.Error())
}
i := 0
for {
i++
fmt.Println(i)
if err = putStrClient.Send(&proto.StreamReqData{Data: strconv.Itoa(i)}); err != nil {
fmt.Println("喜爱那个服务端发送数据失败:" + err.Error())
break
}
time.Sleep(time.Second)
if i >= 10 {
break
}
}
}
// testAllStream 测试双向流模式
func testAllStream(client proto.GreeterClient){
allStrClient, err := client.AllStream(context.Background())
if err != nil {
panic("双向流模式获取失败:" + err.Error())
}
wg := sync.WaitGroup{}
wg.Add(2)
// 向服务端发送数据 因为服务端会从客户端源源不断获取数据,因此服务端也不会自动关闭
go func() {
defer wg.Done()
i := 0
for {
i++
err = allStrClient.Send(&proto.StreamReqData{Data: fmt.Sprintf("双向流模式发送的数据:%d", i)})
if err != nil {
fmt.Println("双向流模式向服务端发送数据失败:" + err.Error())
break
}
if i >= 10 {
break
}
}
}()
// 从服务端接受数据,因为双向流模式时,客户端需要从服务端源源不断接受数据,因此不会关闭
go func() {
defer wg.Done()
for {
data, err := allStrClient.Recv()
if err != nil {
fmt.Println("双向流模式从服务端接受数据失败:" + err.Error())
break
}
fmt.Println("双向流模式收到服务端消息:" + data.Data)
}
}()
wg.Wait()
}
func main() {
// 1、拨号
conn, err := grpc.Dial("localhost:8088", grpc.WithInsecure())
if err != nil {
panic("连接失败:" + err.Error())
}
// 关闭连接
defer conn.Close()
//2、创建客户端
client := proto.NewGreeterClient(conn)
/*3、测试服务端流模式*/
//testGetStream(client)
/*4、测试客户端流模式*/
//testPutStream(client)
/*5、测试双向流模式*/
testAllStream(client)
}
5、测试说明
先启动服务端server.go
再启动客户端client.go,调用对应函数进行测试