標籤: Python

grpc於Python的實作,以資料交換Api介面為例

grpc於Python的實作,以資料交換Api介面為例

今天來整理一下,上上週試做grpc的筆記,其實grpc官網對各語言的支持範例都寫的很完整,實際操作下來,其實也覺得挺友善方便的(連C#也都有):http://www.grpc.io/

原理在這,有興趣者可以看看

其應用場景據官網呈現,看起來主要是能讓你透過protocol-buff的機制,透過結合http協定,達成server-client之間的跨平台(多語言之間)的訊息傳輸,強調高效能,好整合

今天來試做一般會做成restful的api改成grpc的版本吧

我們假設資料交換的場景如下:

已經有一個現成的restful的webapi,透過統一的Url,可透過post傳入參數為behavior, text_data, binary_data

http://localhost:40404/data/entry

behavior=functionX&text_data={json_data}&binary_data={binary_data}

 

原始的service主要透過behavior來判斷要進行何種商業邏輯

走grpc要先定義介面(protos),再進行code gen

編輯你的grpc protos


syntax = "proto3";

option java_multiple_files = true;
option java_package = "test";
option java_outer_classname = "TestDataTransferProto";
option objc_class_prefix = "HLW";

package requestHtml;

// The greeting service definition.
service CollectionData {
  // Sends a data save request
  rpc Entry (dataRequest) returns (dataReply) {}
  rpc Query (queryRequest) returns (queryReplay){}
}

// The request message containing the user's name.
message dataRequest {
  string behavior = 1;
  string version = 2;
  string text_data = 3;
  string binary_data = 4
}

// The response message containing the greetings
message dataReply {
  bool isSuccess = 1;
  string message = 2;
  string entity_from = 3
  string entity_uuid = 4
  string[] payloads = 5
}

// The request message containing the user's name.
message queryRequest {
  string behavior = 1;
  string version = 2;
  string entity_from = 3;
  string entity_uuid = 4
}

// The response message containing the greetings
message queryReply {
  bool isSuccess = 1;
  string message = 2;
  string[] payloads = 3;
}

我們定義了一個Entry與Query兩個method,提供呼叫

Entry提供資料新增與更新,參數包含了behavior, text_data,binary_data, version,這邊version先完全不管他,僅未來提供版本機制

Query提供資料查詢取得,主要差異在多一個entity_from與entiry_uuid,提供查詢的唯一識別值

定義完protos,就可以呼叫code generate tool,直接cd到protos的目錄後,輸入以下指令

你的python 應用程式路徑\models\protos>

python -m grpc_tools.protoc -I../protos --python_out=. --grpc_python_out=. ../protos/datatransfer.proto

若編譯失敗,會顯示如下圖錯誤訊息。

調整後,重新建置就會產生2個檔案:datatransfer_pb2.py與datatransfer_pb2_grpc.py

datatransfer_pb2主要是定義物件,擷錄如下:

_DATAREQUEST = _descriptor.Descriptor(
  name='dataRequest',
  full_name='grpc_mesocollection.dataRequest',
  filename=None,
  file=DESCRIPTOR,
  containing_type=None,
  fields=[
    _descriptor.FieldDescriptor(
      name='behavior', full_name='grpc_mesocollection.dataRequest.behavior', index=0,
      number=1, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='version', full_name='grpc_mesocollection.dataRequest.version', index=1,
      number=2, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='text_data', full_name='grpc_mesocollection.dataRequest.text_data', index=2,
      number=3, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='binary_data', full_name='grpc_mesocollection.dataRequest.binary_data', index=3,
      number=4, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
  ],略

datatransfer_pb2_grpc主要是定義行為,擷錄如下:

class CollectionDataStub(object):
  """The greeting service definition.
  """

  def __init__(self, channel):
    """Constructor.

    Args:
      channel: A grpc.Channel.
    """
    self.Entry = channel.unary_unary(
        '/grpc_mesocollection.CollectionData/Entry',
        request_serializer=dataRequest.SerializeToString,
        response_deserializer=dataReply.FromString,
        )
    self.Query = channel.unary_unary(
        '/grpc_mesocollection.CollectionData/Query',
        request_serializer=queryRequest.SerializeToString,
        response_deserializer=queryReply.FromString,
        )
略

 

基本上,grpc的code gen,是可以幫我們產生許多基礎的介面類別與框架,實作面還是要我們來介入對接,預設上的行為,在code gen的結果都是raise notimplemented error,所以一定要來這邊繼承覆寫掉

class CollectionDataServicer(object):
  """The greeting service definition.
  """

  def Entry(self, request, context):
    """Sends a data save request
    """
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')

  def Query(self, request, context):
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')

 

覆寫的code如下,我們可自訂對接的內部服務(這邊為了演示整合段,省略對接服務code)

class RPCService(CollectionDataServicer):
  def Entry(self, request, context):
	
      behavior = request.behavior
      version = request.version
      text_data = request.text_data
      binary_data = request.binary_data
      #todo call Service
      return dataReply(isSuccess=True, message = "OK, behavior="+behavior,  entity_from="test_from", entity_uuid = "test_uuid")

  def Query(self, request, context):

      behavior = request.behavior
      version = request.version
      entity_uuid = request.entity_uuid
      entity_from = request.entity_from

      #todo call Service
      return queryReply( isSuccess=True, message = "OK, behavior="+behavior, payload_text = "A" , payload_file = "B" )

依照官網的example code,我們知道可以透過以下 程式起一個grpc的server,監聽固定的port,同時掛載上去我們剛剛覆寫的類別,命名為grpc_server.py檔案

def serve():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  add_CollectionDataServicer_to_server(RPCService(), server)
  server.add_insecure_port('[::]:40404')
  server.start()
  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)

if __name__ == '__main__':
  serve()

跑起來看來沒有exception,那來試著呼叫看看吧

先建立grpc_client.py的程式碼


def run(behavior="", text_data="", binary_data=""):
    if behavior != "" and (text_data !="" or binary_data != ""):
        channel = grpc.insecure_channel( 'localhost:40404' )
        stub = CollectionDataStub( channel )
        response = stub.Entry( dataRequest( behavior = behavior, text_data=text_data, binary_data=binary_data ) )
        print( "CollectionDataReceiverStub client received: IsSuccess:%s, Message:%s, Payload:%s " % (response.isSuccess, response.message, response.message ))
    else:
        print("empty payload and behavior!")

if __name__ == '__main__':
    max_count = 10
    start_time = time.time()
    for x in range( 0, max_count ):
        run(  behavior="behavior"+str(x) , text_data="payload"+str(x), binary_data="payload"+str(x))
    print( 'grpc_makeRequest spend %s seconds' % (time.time() - start_time) )

接著對localhost:40404進行測試吧,命名為grpc_client.py檔案

import grpc
import time

from models.protos.datatransfer_pb2 import dataRequest
from models.protos.datatransfer_pb2_grpc import CollectionDataStub


def run(behavior="", text_data="", binary_data=""):
    if behavior != "" and (text_data !="" or binary_data != ""):
        channel = grpc.insecure_channel( 'localhost:40404' )
        stub = CollectionDataStub( channel )
        response = stub.Entry( dataRequest( behavior = behavior, text_data=text_data, binary_data=binary_data ) )
        print( "CollectionDataStub client received: IsSuccess:%s, Message:%s, Payload:%s " % (response.isSuccess, response.message, response.message ))
    else:
        print("empty payload and behavior!")

if __name__ == '__main__':
    max_count = 1000
    start_time = time.time()
    for x in range( 0, max_count ):
        run(  behavior="behavior"+str(x) , text_data="payload"+str(x), binary_data="payload"+str(x))
    print( 'grpc_makeRequest spend %s seconds' % (time.time() - start_time) )

跑起來以後,我設定迴圈是1000次,確實有進到我們的grpc_server的程式去,這代表著grpc這條路已經通了

前略…CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior990, Payload:OK, behavior=behavior990
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior991, Payload:OK, behavior=behavior991
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior992, Payload:OK, behavior=behavior992
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior993, Payload:OK, behavior=behavior993
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior994, Payload:OK, behavior=behavior994
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior995, Payload:OK, behavior=behavior995
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior996, Payload:OK, behavior=behavior996
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior997, Payload:OK, behavior=behavior997
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior998, Payload:OK, behavior=behavior998
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior999, Payload:OK, behavior=behavior999
grpc_makeRequest spend 4.50599217414856 seconds

不過實際應用到生產環境後的效能如何?相比於restful,資料量大筆數多以及binary型別的資料時效能是否更佳?可否正常的傳輸?會不會有blocking的問題呢?就留待下次分曉吧

[Cassandra] 要如何透過python與CQL來新增User Define Type欄位的資料

[Cassandra] 要如何透過python與CQL來新增User Define Type欄位的資料

Cassandra是一個強大的nosql(在特定集群下資料的throughput可是mongo db的10幾倍),我只知道將其發揚光大的facebook,其歷史淵源我也不知多少,就先在此略過

nosql強調的是資料寫入、查詢的效能,但是為了突顯其效能,nosql有其內部的運作方式,因此有些使用上的特性必須去滿足他或是盡量要照著他的查詢規則,例如…cql的語法裡面,你的where條件必須一定要有partition key,而且還要照順序作條件,下update語法時,更新的條件,你一定要包含partition key,也只能使用=運算子,無法像sql那樣,批次大量地去異動資料(如果有錯,歡迎糾正),而且不是關鍵式資料庫特性,不需要正規化的那麼全面,甚至就是橫向的長下去也沒關系…但是若當資料格式是有1對多關系的時候,我仍然希望能存入到nosql的時候怎麼辦呢?

例如:

訂單order下,可能有很多的order item(購買項目),訂單匯總了金額,購買者,運送地址,購買項目關聯了訂單,並描述商品編號,商品數量,以往在RDB裡面,我們可能會設計2張表,然後透過join與transaction的方式來維護相關的資料。先舉個例子:我隨便設計一個產品明細類,然後再加入一個主表,我希望欄位就有1對多的產品明細關係,因此我訂了一個欄位叫order_items,然後型別就是list加上剛剛訂的order_product_item

 

以這樣的一張表有4個欄位,但是有明細與地址的進階型別,我們要如何新增呢?

若透過cql,只需要寫的像cql,並帶入json like的資料結構,就可以新增到我們的CustomerOrder的表了

insert Into "CustomerOrders" (customer_id, order_amount, order_items, shipping_address)
values(1, 2000, [
		{id:4, name:'game', count:1, price:1600, memo:{'tag': 'action game'}},
		{id:20, name:'toy', count:2, price:200, memo:{'tag': 'made by hand'}}
	   ], {zip_code:'210', nation:'Taiwan', city:'NewTaipeiCity', address1:'testAddress1', address2:''});

查詢結果:

注意,欄位裡面可不是string,而是有指定型別的結構,因此是可以作為後續查詢的條件。在這邊為止…先切入今天希望記錄的主題,就是如何透過python來綁定這種user define type。在傳統sql與程式寫transaction sql時,我們常常會這樣寫

insert into a (fieldA, fieldB) Values(@fieldA, @fieldB)

然後帶入paramter的方式,來防止sql injection與型別判斷,而cql呢…他也可以做到類似這樣的寫法:

insert into a (fieldA, fieldB) Values(:fieldA, :fieldB)
insert into a (fieldA, fieldB) Values(?, ?)

然後定義一個字典物件,一併丟給cassandra的driver:session去執行,就可以如我們以前在sql常看到的參數binding一樣,去執行語法並新增

parameter = dict( fieldA=fieldA_Value, fieldBcreator=fieldB_Value)

然而若是user define type的話,如何解呢?list物件,應該就是對應到python的list,應該沒問題,那其他的欄位,有辦法對應下一層dictionary嗎? 試了老半天都是卡關…但是不確定是卡在動態參數binding那段,還是字典裡型別的問題 後來發現了關鍵的官網文件在此:https://datastax.github.io/python-driver/user_defined_types.html 擷圖如下 什麼!!竟然可以直接繼承object物件,然後定義一個初始化方法,指定所有user define type的名稱(帶入的型別由外面檢查) 就可以做到?!我一開始還想試另一條路(有空再說吧…),繼承cassandra usertype型別,使用它的orm方式來做,但是看起來用那個方法無法綁cql一起運作,他有他的獨立運作方式。

 

看了他的試範讓我馬上試著建立類別,並如上圖的方式,直接帶parameter定義的欄位,以我們訂單的例子:

class Address(object):
   def __init__(self, zip_code, nation, city, address1, address2): #欄位都要有,名稱都要正確
       self.zip_code=zip_code
       self.nation=nation
       self.city = city
       self.address1 = address1
       self.address2 = address2
       
class OrderItem(object):
    def __init__(self, id, name='', count=0, price=0, memo=None):  # 欄位都要有,名稱都要正確
        self.id = id
        self.name = name
        self.count = count
        self.price = price
        self.memo = memo
cql = """
insert Into "CustomerOrders" (customer_id, order_amount, order_items, shipping_address)
values(:customer_id, :order_amount, :order_items, :address);
"""

order_items = []
order_items.append(OrderItem(4, "game", 1, 1600, {"tag":"action game"}))
order_items.append(OrderItem(20, "toy", 2, 200, {"tag":"hand made"}))


parameter = dict(
    customer_id=1,
    order_amount=2000,
    order_items=order_items,
    address=Address(zip_code=970, nation="Taiwan", city="NewTaipeCity", address1="Test", address2=""))

session.execute(cql, parameter)

下指令的方式已經像使用sql一樣簡單,而透過上述user define type的定義與python的程式操作可以更靈活的使用cassandra這個強大nosql的特性囉。只是回頭看看自己…捚頭苦幹…這個議題n小時,真的有點吃力不討好,在此紀念我今天try error浪費的光陰,不如官方文件看仔細一點…

ps: Word Press有沒有什麼可以插入code可以用很好的樣式來呈現的外掛呀…改天來研究一下…(先試試這個pastacode…相容於php7)

autopep8 – pep8 auto styling tool 如何整合到Pycharm

autopep8 – pep8 auto styling tool 如何整合到Pycharm

身為一個半路出家的python幼幼班,常常為了轉換語言,而在命名、排版上與直覺發生出入

python有一個python pep8 style(相關介紹)

為了盡量符合pep8的style,這次我先決定套用工具幫我自動排版,希望等到這種排版看久了,自然也會內化吧orz

首先在cmd上輸入:(註:請先使用系統管理員執行cmd,否則會權限不足)

pip install autopep8

 

接著開啟pycharm,點選上方的導覽列file->settings 切換到tools / external tools的頁籤

接著新增一個external tools,參考以下輸入:

Name, Description都可以依自己喜好設定,tool settings這邊請照以下輸入

program: autopep8
parameters: –in-place –aggressive –aggressive $FilePath$ (–in-place代表是直接對該檔案調整,依測試是不可逆的)
working directory: $ProjectFileDir$

以上希望知道參數說明,可以自己打autopep8 -h 研讀


 

其中output filters來新增一組regular exp

範例:$FILE_PATH$\:$LINE$\:$COLUMN$\:.*

以上都設定好以後,我們在程式原始檔上,右鍵:external tools,這時會發現有autopep8的選項可以點

autopep8 –in-place –aggressive –aggressive C:\Users\paul\Desktop\WorkFolder\collection_proxy\data_transfer.py

 

我可以看到console其實他背後就是對指定檔案下以上指令,因此自己也可以這樣去下或批次調整,若希望不要改到原始檔案的話,請記得拿掉–in-place指令

想自行輸出的話,可以參考類似以下語法

autopep8 –aggressive –aggressive C:\Users\paul\Desktop\WorkFolder\meso_collection\meso_collection_proxy\data_model\exchange\exchange_data.py > ./exchange_data_new.py

Before

After

其實格式化後,至少我離python的常用排版更貼近了一些,至於程式可讀性這種東西,我覺得還是見仁見智,必竟程式碼閱讀主要是團隊裡的人看的懂,後續接手維護的人員看的懂最重要,因此程式最重要的還是首重命名,再來排版只要順眼,當然團隊中每個人都可能有自己順眼的方式,但要要找到一個共識,其實沒有絕對的標準答案的

Python相關學習資源(待閱讀)- Python , Nose , Testing..

Python相關學習資源(待閱讀)- Python , Nose , Testing..

http://webcache.googleusercontent.com/search?q=cache:DUqkgFQ2irkJ:www.cnblogs.com/liaofeifight/p/5148717.html+&cd=1&hl=zh-TW&ct=clnk&gl=tw

https://segmentfault.com/a/1190000002965620

http://www.chinesepython.org/pythonfoundry/marrpydoc/python1.htm

https://chrisarndt.de/talks/rupy/2008/output/slides.html : https://read01.com/JooyP.html

http://webcache.googleusercontent.com/search?q=cache:Mbml-raWdOEJ:www.wklken.me/posts/2015/08/29/python-source-memory-1.html+&cd=11&hl=zh-TW&ct=clnk&gl=tw

nose introduction

http://www.cnblogs.com/soaringEveryday/p/5044007.html