標籤: grpc

grpc於Python的實作,以資料交換Api介面為例

grpc於Python的實作,以資料交換Api介面為例

今天來整理一下,上上週試做grpc的筆記,其實grpc官網對各語言的支持範例都寫的很完整,實際操作下來,其實也覺得挺友善方便的(連C#也都有):http://www.grpc.io/

原理在這,有興趣者可以看看

其應用場景據官網呈現,看起來主要是能讓你透過protocol-buff的機制,透過結合http協定,達成server-client之間的跨平台(多語言之間)的訊息傳輸,強調高效能,好整合

今天來試做一般會做成restful的api改成grpc的版本吧

我們假設資料交換的場景如下:

已經有一個現成的restful的webapi,透過統一的Url,可透過post傳入參數為behavior, text_data, binary_data

http://localhost:40404/data/entry

behavior=functionX&text_data={json_data}&binary_data={binary_data}

 

原始的service主要透過behavior來判斷要進行何種商業邏輯

走grpc要先定義介面(protos),再進行code gen

編輯你的grpc protos


syntax = "proto3";

option java_multiple_files = true;
option java_package = "test";
option java_outer_classname = "TestDataTransferProto";
option objc_class_prefix = "HLW";

package requestHtml;

// The greeting service definition.
service CollectionData {
  // Sends a data save request
  rpc Entry (dataRequest) returns (dataReply) {}
  rpc Query (queryRequest) returns (queryReplay){}
}

// The request message containing the user's name.
message dataRequest {
  string behavior = 1;
  string version = 2;
  string text_data = 3;
  string binary_data = 4
}

// The response message containing the greetings
message dataReply {
  bool isSuccess = 1;
  string message = 2;
  string entity_from = 3
  string entity_uuid = 4
  string[] payloads = 5
}

// The request message containing the user's name.
message queryRequest {
  string behavior = 1;
  string version = 2;
  string entity_from = 3;
  string entity_uuid = 4
}

// The response message containing the greetings
message queryReply {
  bool isSuccess = 1;
  string message = 2;
  string[] payloads = 3;
}

我們定義了一個Entry與Query兩個method,提供呼叫

Entry提供資料新增與更新,參數包含了behavior, text_data,binary_data, version,這邊version先完全不管他,僅未來提供版本機制

Query提供資料查詢取得,主要差異在多一個entity_from與entiry_uuid,提供查詢的唯一識別值

定義完protos,就可以呼叫code generate tool,直接cd到protos的目錄後,輸入以下指令

你的python 應用程式路徑\models\protos>

python -m grpc_tools.protoc -I../protos --python_out=. --grpc_python_out=. ../protos/datatransfer.proto

若編譯失敗,會顯示如下圖錯誤訊息。

調整後,重新建置就會產生2個檔案:datatransfer_pb2.py與datatransfer_pb2_grpc.py

datatransfer_pb2主要是定義物件,擷錄如下:

_DATAREQUEST = _descriptor.Descriptor(
  name='dataRequest',
  full_name='grpc_mesocollection.dataRequest',
  filename=None,
  file=DESCRIPTOR,
  containing_type=None,
  fields=[
    _descriptor.FieldDescriptor(
      name='behavior', full_name='grpc_mesocollection.dataRequest.behavior', index=0,
      number=1, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='version', full_name='grpc_mesocollection.dataRequest.version', index=1,
      number=2, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='text_data', full_name='grpc_mesocollection.dataRequest.text_data', index=2,
      number=3, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='binary_data', full_name='grpc_mesocollection.dataRequest.binary_data', index=3,
      number=4, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
  ],略

datatransfer_pb2_grpc主要是定義行為,擷錄如下:

class CollectionDataStub(object):
  """The greeting service definition.
  """

  def __init__(self, channel):
    """Constructor.

    Args:
      channel: A grpc.Channel.
    """
    self.Entry = channel.unary_unary(
        '/grpc_mesocollection.CollectionData/Entry',
        request_serializer=dataRequest.SerializeToString,
        response_deserializer=dataReply.FromString,
        )
    self.Query = channel.unary_unary(
        '/grpc_mesocollection.CollectionData/Query',
        request_serializer=queryRequest.SerializeToString,
        response_deserializer=queryReply.FromString,
        )
略

 

基本上,grpc的code gen,是可以幫我們產生許多基礎的介面類別與框架,實作面還是要我們來介入對接,預設上的行為,在code gen的結果都是raise notimplemented error,所以一定要來這邊繼承覆寫掉

class CollectionDataServicer(object):
  """The greeting service definition.
  """

  def Entry(self, request, context):
    """Sends a data save request
    """
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')

  def Query(self, request, context):
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')

 

覆寫的code如下,我們可自訂對接的內部服務(這邊為了演示整合段,省略對接服務code)

class RPCService(CollectionDataServicer):
  def Entry(self, request, context):
	
      behavior = request.behavior
      version = request.version
      text_data = request.text_data
      binary_data = request.binary_data
      #todo call Service
      return dataReply(isSuccess=True, message = "OK, behavior="+behavior,  entity_from="test_from", entity_uuid = "test_uuid")

  def Query(self, request, context):

      behavior = request.behavior
      version = request.version
      entity_uuid = request.entity_uuid
      entity_from = request.entity_from

      #todo call Service
      return queryReply( isSuccess=True, message = "OK, behavior="+behavior, payload_text = "A" , payload_file = "B" )

依照官網的example code,我們知道可以透過以下 程式起一個grpc的server,監聽固定的port,同時掛載上去我們剛剛覆寫的類別,命名為grpc_server.py檔案

def serve():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  add_CollectionDataServicer_to_server(RPCService(), server)
  server.add_insecure_port('[::]:40404')
  server.start()
  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)

if __name__ == '__main__':
  serve()

跑起來看來沒有exception,那來試著呼叫看看吧

先建立grpc_client.py的程式碼


def run(behavior="", text_data="", binary_data=""):
    if behavior != "" and (text_data !="" or binary_data != ""):
        channel = grpc.insecure_channel( 'localhost:40404' )
        stub = CollectionDataStub( channel )
        response = stub.Entry( dataRequest( behavior = behavior, text_data=text_data, binary_data=binary_data ) )
        print( "CollectionDataReceiverStub client received: IsSuccess:%s, Message:%s, Payload:%s " % (response.isSuccess, response.message, response.message ))
    else:
        print("empty payload and behavior!")

if __name__ == '__main__':
    max_count = 10
    start_time = time.time()
    for x in range( 0, max_count ):
        run(  behavior="behavior"+str(x) , text_data="payload"+str(x), binary_data="payload"+str(x))
    print( 'grpc_makeRequest spend %s seconds' % (time.time() - start_time) )

接著對localhost:40404進行測試吧,命名為grpc_client.py檔案

import grpc
import time

from models.protos.datatransfer_pb2 import dataRequest
from models.protos.datatransfer_pb2_grpc import CollectionDataStub


def run(behavior="", text_data="", binary_data=""):
    if behavior != "" and (text_data !="" or binary_data != ""):
        channel = grpc.insecure_channel( 'localhost:40404' )
        stub = CollectionDataStub( channel )
        response = stub.Entry( dataRequest( behavior = behavior, text_data=text_data, binary_data=binary_data ) )
        print( "CollectionDataStub client received: IsSuccess:%s, Message:%s, Payload:%s " % (response.isSuccess, response.message, response.message ))
    else:
        print("empty payload and behavior!")

if __name__ == '__main__':
    max_count = 1000
    start_time = time.time()
    for x in range( 0, max_count ):
        run(  behavior="behavior"+str(x) , text_data="payload"+str(x), binary_data="payload"+str(x))
    print( 'grpc_makeRequest spend %s seconds' % (time.time() - start_time) )

跑起來以後,我設定迴圈是1000次,確實有進到我們的grpc_server的程式去,這代表著grpc這條路已經通了

前略…CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior990, Payload:OK, behavior=behavior990
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior991, Payload:OK, behavior=behavior991
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior992, Payload:OK, behavior=behavior992
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior993, Payload:OK, behavior=behavior993
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior994, Payload:OK, behavior=behavior994
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior995, Payload:OK, behavior=behavior995
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior996, Payload:OK, behavior=behavior996
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior997, Payload:OK, behavior=behavior997
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior998, Payload:OK, behavior=behavior998
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior999, Payload:OK, behavior=behavior999
grpc_makeRequest spend 4.50599217414856 seconds

不過實際應用到生產環境後的效能如何?相比於restful,資料量大筆數多以及binary型別的資料時效能是否更佳?可否正常的傳輸?會不會有blocking的問題呢?就留待下次分曉吧