今天來整理一下,上上週試做grpc的筆記,其實grpc官網對各語言的支持範例都寫的很完整,實際操作下來,其實也覺得挺友善方便的(連C#也都有):http://www.grpc.io/
原理在這,有興趣者可以看看
其應用場景據官網呈現,看起來主要是能讓你透過protocol-buff的機制,透過結合http協定,達成server-client之間的跨平台(多語言之間)的訊息傳輸,強調高效能,好整合
今天來試做一般會做成restful的api改成grpc的版本吧
我們假設資料交換的場景如下:
已經有一個現成的restful的webapi,透過統一的Url,可透過post傳入參數為behavior, text_data, binary_data
http://localhost:40404/data/entry
behavior=functionX&text_data={json_data}&binary_data={binary_data}
原始的service主要透過behavior來判斷要進行何種商業邏輯
走grpc要先定義介面(protos),再進行code gen
編輯你的grpc protos
syntax = "proto3";
option java_multiple_files = true;
option java_package = "test";
option java_outer_classname = "TestDataTransferProto";
option objc_class_prefix = "HLW";
package requestHtml;
// The greeting service definition.
service CollectionData {
// Sends a data save request
rpc Entry (dataRequest) returns (dataReply) {}
rpc Query (queryRequest) returns (queryReplay){}
}
// The request message containing the user's name.
message dataRequest {
string behavior = 1;
string version = 2;
string text_data = 3;
string binary_data = 4
}
// The response message containing the greetings
message dataReply {
bool isSuccess = 1;
string message = 2;
string entity_from = 3
string entity_uuid = 4
string[] payloads = 5
}
// The request message containing the user's name.
message queryRequest {
string behavior = 1;
string version = 2;
string entity_from = 3;
string entity_uuid = 4
}
// The response message containing the greetings
message queryReply {
bool isSuccess = 1;
string message = 2;
string[] payloads = 3;
}
我們定義了一個Entry與Query兩個method,提供呼叫
Entry提供資料新增與更新,參數包含了behavior, text_data,binary_data, version,這邊version先完全不管他,僅未來提供版本機制
Query提供資料查詢取得,主要差異在多一個entity_from與entiry_uuid,提供查詢的唯一識別值
定義完protos,就可以呼叫code generate tool,直接cd到protos的目錄後,輸入以下指令
你的python 應用程式路徑\models\protos>
python -m grpc_tools.protoc -I../protos --python_out=. --grpc_python_out=. ../protos/datatransfer.proto
若編譯失敗,會顯示如下圖錯誤訊息。
調整後,重新建置就會產生2個檔案:datatransfer_pb2.py與datatransfer_pb2_grpc.py
datatransfer_pb2主要是定義物件,擷錄如下:
_DATAREQUEST = _descriptor.Descriptor(
name='dataRequest',
full_name='grpc_mesocollection.dataRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='behavior', full_name='grpc_mesocollection.dataRequest.behavior', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='version', full_name='grpc_mesocollection.dataRequest.version', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='text_data', full_name='grpc_mesocollection.dataRequest.text_data', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='binary_data', full_name='grpc_mesocollection.dataRequest.binary_data', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],略
datatransfer_pb2_grpc主要是定義行為,擷錄如下:
class CollectionDataStub(object):
"""The greeting service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Entry = channel.unary_unary(
'/grpc_mesocollection.CollectionData/Entry',
request_serializer=dataRequest.SerializeToString,
response_deserializer=dataReply.FromString,
)
self.Query = channel.unary_unary(
'/grpc_mesocollection.CollectionData/Query',
request_serializer=queryRequest.SerializeToString,
response_deserializer=queryReply.FromString,
)
略
基本上,grpc的code gen,是可以幫我們產生許多基礎的介面類別與框架,實作面還是要我們來介入對接,預設上的行為,在code gen的結果都是raise notimplemented error,所以一定要來這邊繼承覆寫掉
class CollectionDataServicer(object):
"""The greeting service definition.
"""
def Entry(self, request, context):
"""Sends a data save request
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Query(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
覆寫的code如下,我們可自訂對接的內部服務(這邊為了演示整合段,省略對接服務code)
class RPCService(CollectionDataServicer):
def Entry(self, request, context):
behavior = request.behavior
version = request.version
text_data = request.text_data
binary_data = request.binary_data
#todo call Service
return dataReply(isSuccess=True, message = "OK, behavior="+behavior, entity_from="test_from", entity_uuid = "test_uuid")
def Query(self, request, context):
behavior = request.behavior
version = request.version
entity_uuid = request.entity_uuid
entity_from = request.entity_from
#todo call Service
return queryReply( isSuccess=True, message = "OK, behavior="+behavior, payload_text = "A" , payload_file = "B" )
依照官網的example code,我們知道可以透過以下 程式起一個grpc的server,監聽固定的port,同時掛載上去我們剛剛覆寫的類別,命名為grpc_server.py檔案
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_CollectionDataServicer_to_server(RPCService(), server)
server.add_insecure_port('[::]:40404')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
跑起來看來沒有exception,那來試著呼叫看看吧
先建立grpc_client.py的程式碼
def run(behavior="", text_data="", binary_data=""):
if behavior != "" and (text_data !="" or binary_data != ""):
channel = grpc.insecure_channel( 'localhost:40404' )
stub = CollectionDataStub( channel )
response = stub.Entry( dataRequest( behavior = behavior, text_data=text_data, binary_data=binary_data ) )
print( "CollectionDataReceiverStub client received: IsSuccess:%s, Message:%s, Payload:%s " % (response.isSuccess, response.message, response.message ))
else:
print("empty payload and behavior!")
if __name__ == '__main__':
max_count = 10
start_time = time.time()
for x in range( 0, max_count ):
run( behavior="behavior"+str(x) , text_data="payload"+str(x), binary_data="payload"+str(x))
print( 'grpc_makeRequest spend %s seconds' % (time.time() - start_time) )
接著對localhost:40404進行測試吧,命名為grpc_client.py檔案
import grpc
import time
from models.protos.datatransfer_pb2 import dataRequest
from models.protos.datatransfer_pb2_grpc import CollectionDataStub
def run(behavior="", text_data="", binary_data=""):
if behavior != "" and (text_data !="" or binary_data != ""):
channel = grpc.insecure_channel( 'localhost:40404' )
stub = CollectionDataStub( channel )
response = stub.Entry( dataRequest( behavior = behavior, text_data=text_data, binary_data=binary_data ) )
print( "CollectionDataStub client received: IsSuccess:%s, Message:%s, Payload:%s " % (response.isSuccess, response.message, response.message ))
else:
print("empty payload and behavior!")
if __name__ == '__main__':
max_count = 1000
start_time = time.time()
for x in range( 0, max_count ):
run( behavior="behavior"+str(x) , text_data="payload"+str(x), binary_data="payload"+str(x))
print( 'grpc_makeRequest spend %s seconds' % (time.time() - start_time) )
跑起來以後,我設定迴圈是1000次,確實有進到我們的grpc_server的程式去,這代表著grpc這條路已經通了
前略…CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior990, Payload:OK, behavior=behavior990
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior991, Payload:OK, behavior=behavior991
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior992, Payload:OK, behavior=behavior992
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior993, Payload:OK, behavior=behavior993
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior994, Payload:OK, behavior=behavior994
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior995, Payload:OK, behavior=behavior995
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior996, Payload:OK, behavior=behavior996
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior997, Payload:OK, behavior=behavior997
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior998, Payload:OK, behavior=behavior998
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior999, Payload:OK, behavior=behavior999
grpc_makeRequest spend 4.50599217414856 seconds
不過實際應用到生產環境後的效能如何?相比於restful,資料量大筆數多以及binary型別的資料時效能是否更佳?可否正常的傳輸?會不會有blocking的問題呢?就留待下次分曉吧