作者: paul

[平行處理] 考慮使用concurrent.futures模組來達成真正的平行處理

[平行處理] 考慮使用concurrent.futures模組來達成真正的平行處理

[原文擷自Effective Python中文版]結合自己的實作與理解

對於擁有越來越多CPU核心的現代電腦而言,我們可以合理的假設,改善程式碼效能其中一個解決方案就是平行處理(Parallelism)

然而,Python的GIL讓執行緒無法達到真正的平行處理,所以這個選項出局了。另一個常見的建議是將程式碼中效能最關鍵的地方以C語言改寫為一個擴充功能模組(extension module)

C讓我們更靠近底層機器,執行的比Python還要快,消除了平行處理的必要。

Read More Read More

[Effective Python Selection] PEP8風格指南精選

[Effective Python Selection] PEP8風格指南精選

PEP 8又稱為Python Enhancement Proposal #8是python程式碼編寫格式的風格指南。

當然,我們可以用任何想要的方式去撰寫python程式碼,只要語法正確就行了

然而使用一致的Style好處,就是讓我們的程式碼更具備可讀性、更易親近,在一個社群或團隊中,能促使專案協作的更順利。

Read More Read More

[記錄]使用Docker Compose 在Docker上試用Apache Mesos 和Marathon

[記錄]使用Docker Compose 在Docker上試用Apache Mesos 和Marathon

如主題,記錄可用的Docker-Compose檔案,預設yml檔名必須是docker-compose

mkdir mesos

cd mesos

sudo nano docker-compose.yml

輸入以下內容

zookeeper:
  image: garland/zookeeper
  ports:
   - "2181:2181"
   - "2888:2888"
   - "3888:3888"
mesosmaster:
  image: garland/mesosphere-docker-mesos-master
  ports:
   - "5050:5050"
  links:
   - zookeeper:zk
  environment:
   - MESOS_ZK=zk://zk:2181/mesos
   - MESOS_LOG_DIR=/var/log/mesos
   - MESOS_QUORUM=1
   - MESOS_REGISTRY=in_memory
   - MESOS_WORK_DIR=/var/lib/mesos
marathon:
  image: garland/mesosphere-docker-marathon
  links:
   - zookeeper:zk
   - mesosmaster:master
  command: --master zk://zk:2181/mesos --zk zk://zk:2181/marathon
  ports:
   - "8080:8080"
mesosslave:
  image: garland/mesosphere-docker-mesos-master:latest
  ports:
   - "5051:5051"
  links:
   - zookeeper:zk
   - mesosmaster:master
  entrypoint: mesos-slave
  environment:
   - MESOS_HOSTNAME=#xxx.xxx.xxx.xxx
   - MESOS_MASTER=zk://zk:2181/mesos
   - MESOS_LOG_DIR=/var/log/mesos
   - MESOS_LOGGING_LEVEL=INFO

sudo docker-compose up -d

grpc於Python的實作,以資料交換Api介面為例

grpc於Python的實作,以資料交換Api介面為例

今天來整理一下,上上週試做grpc的筆記,其實grpc官網對各語言的支持範例都寫的很完整,實際操作下來,其實也覺得挺友善方便的(連C#也都有):http://www.grpc.io/

原理在這,有興趣者可以看看

其應用場景據官網呈現,看起來主要是能讓你透過protocol-buff的機制,透過結合http協定,達成server-client之間的跨平台(多語言之間)的訊息傳輸,強調高效能,好整合

今天來試做一般會做成restful的api改成grpc的版本吧

我們假設資料交換的場景如下:

已經有一個現成的restful的webapi,透過統一的Url,可透過post傳入參數為behavior, text_data, binary_data

http://localhost:40404/data/entry

behavior=functionX&text_data={json_data}&binary_data={binary_data}

 

原始的service主要透過behavior來判斷要進行何種商業邏輯

走grpc要先定義介面(protos),再進行code gen

編輯你的grpc protos


syntax = "proto3";

option java_multiple_files = true;
option java_package = "test";
option java_outer_classname = "TestDataTransferProto";
option objc_class_prefix = "HLW";

package requestHtml;

// The greeting service definition.
service CollectionData {
  // Sends a data save request
  rpc Entry (dataRequest) returns (dataReply) {}
  rpc Query (queryRequest) returns (queryReplay){}
}

// The request message containing the user's name.
message dataRequest {
  string behavior = 1;
  string version = 2;
  string text_data = 3;
  string binary_data = 4
}

// The response message containing the greetings
message dataReply {
  bool isSuccess = 1;
  string message = 2;
  string entity_from = 3
  string entity_uuid = 4
  string[] payloads = 5
}

// The request message containing the user's name.
message queryRequest {
  string behavior = 1;
  string version = 2;
  string entity_from = 3;
  string entity_uuid = 4
}

// The response message containing the greetings
message queryReply {
  bool isSuccess = 1;
  string message = 2;
  string[] payloads = 3;
}

我們定義了一個Entry與Query兩個method,提供呼叫

Entry提供資料新增與更新,參數包含了behavior, text_data,binary_data, version,這邊version先完全不管他,僅未來提供版本機制

Query提供資料查詢取得,主要差異在多一個entity_from與entiry_uuid,提供查詢的唯一識別值

定義完protos,就可以呼叫code generate tool,直接cd到protos的目錄後,輸入以下指令

你的python 應用程式路徑\models\protos>

python -m grpc_tools.protoc -I../protos --python_out=. --grpc_python_out=. ../protos/datatransfer.proto

若編譯失敗,會顯示如下圖錯誤訊息。

調整後,重新建置就會產生2個檔案:datatransfer_pb2.py與datatransfer_pb2_grpc.py

datatransfer_pb2主要是定義物件,擷錄如下:

_DATAREQUEST = _descriptor.Descriptor(
  name='dataRequest',
  full_name='grpc_mesocollection.dataRequest',
  filename=None,
  file=DESCRIPTOR,
  containing_type=None,
  fields=[
    _descriptor.FieldDescriptor(
      name='behavior', full_name='grpc_mesocollection.dataRequest.behavior', index=0,
      number=1, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='version', full_name='grpc_mesocollection.dataRequest.version', index=1,
      number=2, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='text_data', full_name='grpc_mesocollection.dataRequest.text_data', index=2,
      number=3, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
    _descriptor.FieldDescriptor(
      name='binary_data', full_name='grpc_mesocollection.dataRequest.binary_data', index=3,
      number=4, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=_b("").decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      options=None),
  ],略

datatransfer_pb2_grpc主要是定義行為,擷錄如下:

class CollectionDataStub(object):
  """The greeting service definition.
  """

  def __init__(self, channel):
    """Constructor.

    Args:
      channel: A grpc.Channel.
    """
    self.Entry = channel.unary_unary(
        '/grpc_mesocollection.CollectionData/Entry',
        request_serializer=dataRequest.SerializeToString,
        response_deserializer=dataReply.FromString,
        )
    self.Query = channel.unary_unary(
        '/grpc_mesocollection.CollectionData/Query',
        request_serializer=queryRequest.SerializeToString,
        response_deserializer=queryReply.FromString,
        )
略

 

基本上,grpc的code gen,是可以幫我們產生許多基礎的介面類別與框架,實作面還是要我們來介入對接,預設上的行為,在code gen的結果都是raise notimplemented error,所以一定要來這邊繼承覆寫掉

class CollectionDataServicer(object):
  """The greeting service definition.
  """

  def Entry(self, request, context):
    """Sends a data save request
    """
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')

  def Query(self, request, context):
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')

 

覆寫的code如下,我們可自訂對接的內部服務(這邊為了演示整合段,省略對接服務code)

class RPCService(CollectionDataServicer):
  def Entry(self, request, context):
	
      behavior = request.behavior
      version = request.version
      text_data = request.text_data
      binary_data = request.binary_data
      #todo call Service
      return dataReply(isSuccess=True, message = "OK, behavior="+behavior,  entity_from="test_from", entity_uuid = "test_uuid")

  def Query(self, request, context):

      behavior = request.behavior
      version = request.version
      entity_uuid = request.entity_uuid
      entity_from = request.entity_from

      #todo call Service
      return queryReply( isSuccess=True, message = "OK, behavior="+behavior, payload_text = "A" , payload_file = "B" )

依照官網的example code,我們知道可以透過以下 程式起一個grpc的server,監聽固定的port,同時掛載上去我們剛剛覆寫的類別,命名為grpc_server.py檔案

def serve():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  add_CollectionDataServicer_to_server(RPCService(), server)
  server.add_insecure_port('[::]:40404')
  server.start()
  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)

if __name__ == '__main__':
  serve()

跑起來看來沒有exception,那來試著呼叫看看吧

先建立grpc_client.py的程式碼


def run(behavior="", text_data="", binary_data=""):
    if behavior != "" and (text_data !="" or binary_data != ""):
        channel = grpc.insecure_channel( 'localhost:40404' )
        stub = CollectionDataStub( channel )
        response = stub.Entry( dataRequest( behavior = behavior, text_data=text_data, binary_data=binary_data ) )
        print( "CollectionDataReceiverStub client received: IsSuccess:%s, Message:%s, Payload:%s " % (response.isSuccess, response.message, response.message ))
    else:
        print("empty payload and behavior!")

if __name__ == '__main__':
    max_count = 10
    start_time = time.time()
    for x in range( 0, max_count ):
        run(  behavior="behavior"+str(x) , text_data="payload"+str(x), binary_data="payload"+str(x))
    print( 'grpc_makeRequest spend %s seconds' % (time.time() - start_time) )

接著對localhost:40404進行測試吧,命名為grpc_client.py檔案

import grpc
import time

from models.protos.datatransfer_pb2 import dataRequest
from models.protos.datatransfer_pb2_grpc import CollectionDataStub


def run(behavior="", text_data="", binary_data=""):
    if behavior != "" and (text_data !="" or binary_data != ""):
        channel = grpc.insecure_channel( 'localhost:40404' )
        stub = CollectionDataStub( channel )
        response = stub.Entry( dataRequest( behavior = behavior, text_data=text_data, binary_data=binary_data ) )
        print( "CollectionDataStub client received: IsSuccess:%s, Message:%s, Payload:%s " % (response.isSuccess, response.message, response.message ))
    else:
        print("empty payload and behavior!")

if __name__ == '__main__':
    max_count = 1000
    start_time = time.time()
    for x in range( 0, max_count ):
        run(  behavior="behavior"+str(x) , text_data="payload"+str(x), binary_data="payload"+str(x))
    print( 'grpc_makeRequest spend %s seconds' % (time.time() - start_time) )

跑起來以後,我設定迴圈是1000次,確實有進到我們的grpc_server的程式去,這代表著grpc這條路已經通了

前略…CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior990, Payload:OK, behavior=behavior990
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior991, Payload:OK, behavior=behavior991
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior992, Payload:OK, behavior=behavior992
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior993, Payload:OK, behavior=behavior993
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior994, Payload:OK, behavior=behavior994
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior995, Payload:OK, behavior=behavior995
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior996, Payload:OK, behavior=behavior996
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior997, Payload:OK, behavior=behavior997
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior998, Payload:OK, behavior=behavior998
CollectionDataStub client received: IsSuccess:True, Message:OK, behavior=behavior999, Payload:OK, behavior=behavior999
grpc_makeRequest spend 4.50599217414856 seconds

不過實際應用到生產環境後的效能如何?相比於restful,資料量大筆數多以及binary型別的資料時效能是否更佳?可否正常的傳輸?會不會有blocking的問題呢?就留待下次分曉吧

在centOS上安裝Docker CE(Communtity Edition) – 記錄

在centOS上安裝Docker CE(Communtity Edition) – 記錄

基本上,copy自連結,純粹記錄指令,因為各家linux的預設程式安裝器不盡相同,這邊記錄centos版本:

移除舊的docker

$ sudo yum remove docker docker-common container-selinux docker-selinux docker-engine docker-engine-selinux

設定repository(Docker CE)

  1. 安裝必要的套件 yum-utils, yum-config-manager以及devicemapper所需要的storage driver:device-mapper-persistent-data and lvm2
    sudo yum install -y yum-utils device-mapper-persistent-data lvm2
  2. 啟用the extras CentOS repository. 確保能存取到docker-c所需要的container-selinux 套件
    $ sudo yum-config-manager –enable extras
  3. 設定 預設取得stable 版本的repository(建議預設,即使你要用edge的版本)
    sudo yum-config-manager \
        --add-repo \
        https://download.docker.com/linux/centos/docker-ce.repo
  4. 選擇性: 啟用edge repository.
    $ sudo yum-config-manager –enable docker-ce-edge
  5. 若要關閉 the edge repository就使用 --disable 標籤,若要重新啟用,就使用enable標籤,以關閉為例:
    $ sudo yum-config-manager –disable docker-ce-edge

 

安裝docker

  1. 更新yum的套件清單
    $ sudo yum makecache fast
  2. 列出指定版本的docker清單
    $ yum list docker-ce.x86_64 –showduplicates |sort -r
  3. 安裝最新版本或是安裝特定版本的docker
    $ sudo yum install docker-ce
    or
    $ sudo yum install docker-ce-<VERSION>

    Warning: If you have multiple Docker repositories enabled, installing or updating without specifying a version in the yum install or yum update command will always install the highest possible version, which may not be appropriate for your stability needs.

  4. 編輯 /etc/docker/daemon.json. 好像是要配置存儲相關的設定,正式環境請見:連結{
    “storage-driver”: “devicemapper”
    }
  5. 啟動Docker.
    $ sudo systemctl start docker
  6. 可以驗證docker是否安裝正確,跑跑看docker的hello world的映象檔吧$ sudo docker run hello-world
Unittest Python – My First Mock Lesson!!How to Mock 3rd party Component!!

Unittest Python – My First Mock Lesson!!How to Mock 3rd party Component!!

過往,我們在寫程式中,常常會去呼叫到參考的函式庫,或是請求外部的服務

這導致在單元測試中,我們會因為不具備其環境或是資源,而陷入難以測試的場景

若以整個系統來看,模組之間也可以說是相對的是一個”單元”吧,最常相依的就是資料!

這種情況,通常我們有的時間的話,可以大費周章的建立數據庫、測試檔案與環境,測試前後準備資料,刪除資料流程。

不過這往往是整測時才會遇到的場景,這邊先不討論。

 

我希望的單元測試是集中著重在自已這端的程式運作是否符合預期!!

管他外部環境怎樣,網路有沒有通、世界是否毀滅都與我暫時無關

mock的技巧就是來拯救我們上述說到的難題與困境,mock是用來模擬相依的外部服務或物件的物件。

也就是我們透過mock來將所有程式外部的相依性解除吧!!這麼說很玄

在python的unittest的函式庫中已有包含

 

這邊直接演示(記錄)程式使用的方式,這個例子有點簡單,但是幾乎完全相依!!

from pywebhdfs.webhdfs import PyWebHdfsClient

class WebHDFSHelper:
    __hdfs_client = None

    def __init__(self, host='', port='', user=''):
        self.__hdfs_client = PyWebHdfsClient(host=host, port=port,
                                             user_name=user)

    def create_dir(self, full_dir_path):
        try:
            #before - do something
            result = self.__hdfs_client.make_dir(full_dir_path)
            #after - do something
        except BaseException:
            #exception - do something
            raise
        return result
	
def test_create_dir_temp(self):
    helper = WebHDFSHelper(host="192.168.65.130", port=50070, user="root")
    result = helper.create_dir( "exchange_test" )
    self.assertEqual(result, True)
    pass

當我們要測試建立目錄這個行為時,我們往往會遇到困難就是我必須去真實連到一個hdfs的環境!!

通常可能開發者本機都會有,或者確保團隊有一些共同的測試環境

但這種相依性會導致,若程式移轉到其他人手上或是環境一變動

測試就會無法運作!

例如:ip變了!

 

這次我們來透過mock的技巧來演示如何將如何模擬這些hdfs服務

@mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.__init__')
@mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.make_dir')
def test_create_dir(self, mock_hdfs_client ,mock_init_service):
    mock_init_service.return_value = None
    helper = WebHDFSHelper(host="localhost", port=50070, user="root")
    mock_init_service.assert_called_with(host="localhost", port=50070, user_name="root")


    helper.create_dir( DATA_TEMP_PATH )
    mock_hdfs_client.assert_called_with(DATA_TEMP_PATH)
    pass

首先,我們要先模擬的就是

第一塊WebHDFSHelper下面的初始函式會去依照參數連線到外部的hdfs服務

self.__hdfs_client = PyWebHdfsClient(host=host, port=port,user_name=user)

第二塊是呼叫create_dir的時候的這一行會去呼叫參考物件的make_dir的方法,這個會相依上面那塊初始化後的物件。

result = self.__hdfs_client.make_dir(full_dir_path)

 

所以我們知道這兩塊是我們能獨立單元測試的排除目標

因此我們對應的宣告

@mock.patch(‘pywebhdfs.webhdfs.PyWebHdfsClient.__init__’)
@mock.patch(‘pywebhdfs.webhdfs.PyWebHdfsClient.make_dir’)

並對我們的test_測試函式,注入2組參數

def test_create_dir(self, mock_hdfs_client ,mock_init_service):

前者是我們到時要呼叫make_dir的方法,後者是我們初始化時要mock的行為。

註:這邊mock的patch宣告所對test函式注入的順序為何是倒序,我則不知道

 

接著依我們的場景,mock物件可以做幾件事

1.指定回傳物件,因為是模擬,就可以一併模擬回傳的結果

mock_init_service.return_value = None #若要模擬初始函式,就是回傳值一定是none

2.透過mock物件來確認是否有被呼叫到,連同參數一併帶入,注意參數名稱必須符合原始模擬物件的參數名稱)

mock_init_service.assert_called_with(host=”localhost”, port=50070, user_name=”root”)

 

因此測試如下:

class HDFSHelperTest(unittest.TestCase): 
    @mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.__init__') 
    @mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.make_dir') 
    def test_create_dir(self, mock_hdfs_client ,mock_init_service): 
        mock_init_service.return_value = None 
        helper = WebHDFSHelper(host="localhost", port=50070, user="root") 
        mock_init_service.assert_called_with(host="localhost", port=50070, user_name="root") 
        helper.create_dir( DATA_TEMP_PATH ) 
        mock_hdfs_client.assert_called_with(DATA_TEMP_PATH)

我們設個DEBUG中斷點看看,在初始化後,我們的內部變數已經被轉換成一個含有magicmock行為make_dir的物件。

因為初始化這邊沒有其他需要驗證的流程,因此我只去assert是否有照著我給的參數呼叫,這時管他這個環境是不是活著,我只需要知道”若”我給活的環境 就會正常。 若需要驗證例外行為,以這個例子,可以加入更多的例外處理。 這時一樣可以透過mock的return_value raise exception,這是做的到的,然而驗證的東西還是要自行定義,在一般函式環境的話,你可以因為例外給定預設行為,這樣也是有助於提高程式測試的覆蓋率。 例如:

mock_init_service.return_value = None
mock_init_service.side_effect = Exception("環境異常")
helper = WebHDFSHelper( host="localhost", port=50070, user="root" )
mock_init_service.assert_called_with( host="localhost", port=50070, user_name="root" )

如願拋出異常 最後,我們實際測試create_dir吧,第1段,我將他設定為mock_hdfs_client.return_value = True,第2段設定為False

確實會如願的走向1次成功、1次失敗,當然我這邊程式只為了驗證是否如配置去跑,所以只有這樣寫,一般成功、失敗處理上的邏輯差別還是會依原本物件設計的流程來運作 因此要「驗證」的東西與結果,仍然可以依自己的需求去調整與配置。 以上就是python的mock初體驗,以自己的例子還是比較清楚mock技巧的使用場景與方式…有錯的部分請不吝指教,感謝

 

測試程式完整版

from pywebhdfs.webhdfs import PyWebHdfsClient
class WebHDFSHelper:
    def __init__(self, data_path='', host='', port='', user=''):
        try:
            self.__hdfs_client = PyWebHdfsClient(host=host, port=port,
                                                 user_name=user)
        except Exception as err:
            print(str(err))
    
        def create_dir(self, full_dir_path):
            try:
                result = self.__hdfs_client.make_dir(full_dir_path)
                if result:
                    print("成功")
                else:
                    print("失敗")
            except:
                raise
            return result


class HDFSHelperTest(unittest.TestCase):
    @mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.__init__')
    @mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.make_dir')
    def test_create_dir(self, mock_hdfs_client, mock_init_service):
        mock_init_service.return_value = None

        mock_init_service.side_effect = Exception("環境異常")
        helper = WebHDFSHelper(host="localhost", port=50070, user="root")
        mock_init_service.assert_called_with(
            host="localhost", port=50070, user_name="root")

        mock_init_service.return_value = None
        mock_init_service.side_effect = None
        helper = WebHDFSHelper(host="localhost", port=50070, user="root")
        mock_init_service.assert_called_with(
            host="localhost", port=50070, user_name="root")
    
        mock_hdfs_client.return_value = True
        helper.create_dir(DATA_TEMP_PATH)
        mock_hdfs_client.assert_called_with(DATA_TEMP_PATH)
    
        mock_hdfs_client.return_value = False
        helper.create_dir(DATA_TEMP_PATH)
        mock_hdfs_client.assert_called_with(DATA_TEMP_PATH)
    pass
[Cassandra] 要如何透過python與CQL來新增User Define Type欄位的資料

[Cassandra] 要如何透過python與CQL來新增User Define Type欄位的資料

Cassandra是一個強大的nosql(在特定集群下資料的throughput可是mongo db的10幾倍),我只知道將其發揚光大的facebook,其歷史淵源我也不知多少,就先在此略過

nosql強調的是資料寫入、查詢的效能,但是為了突顯其效能,nosql有其內部的運作方式,因此有些使用上的特性必須去滿足他或是盡量要照著他的查詢規則,例如…cql的語法裡面,你的where條件必須一定要有partition key,而且還要照順序作條件,下update語法時,更新的條件,你一定要包含partition key,也只能使用=運算子,無法像sql那樣,批次大量地去異動資料(如果有錯,歡迎糾正),而且不是關鍵式資料庫特性,不需要正規化的那麼全面,甚至就是橫向的長下去也沒關系…但是若當資料格式是有1對多關系的時候,我仍然希望能存入到nosql的時候怎麼辦呢?

例如:

訂單order下,可能有很多的order item(購買項目),訂單匯總了金額,購買者,運送地址,購買項目關聯了訂單,並描述商品編號,商品數量,以往在RDB裡面,我們可能會設計2張表,然後透過join與transaction的方式來維護相關的資料。先舉個例子:我隨便設計一個產品明細類,然後再加入一個主表,我希望欄位就有1對多的產品明細關係,因此我訂了一個欄位叫order_items,然後型別就是list加上剛剛訂的order_product_item

 

以這樣的一張表有4個欄位,但是有明細與地址的進階型別,我們要如何新增呢?

若透過cql,只需要寫的像cql,並帶入json like的資料結構,就可以新增到我們的CustomerOrder的表了

insert Into "CustomerOrders" (customer_id, order_amount, order_items, shipping_address)
values(1, 2000, [
		{id:4, name:'game', count:1, price:1600, memo:{'tag': 'action game'}},
		{id:20, name:'toy', count:2, price:200, memo:{'tag': 'made by hand'}}
	   ], {zip_code:'210', nation:'Taiwan', city:'NewTaipeiCity', address1:'testAddress1', address2:''});

查詢結果:

注意,欄位裡面可不是string,而是有指定型別的結構,因此是可以作為後續查詢的條件。在這邊為止…先切入今天希望記錄的主題,就是如何透過python來綁定這種user define type。在傳統sql與程式寫transaction sql時,我們常常會這樣寫

insert into a (fieldA, fieldB) Values(@fieldA, @fieldB)

然後帶入paramter的方式,來防止sql injection與型別判斷,而cql呢…他也可以做到類似這樣的寫法:

insert into a (fieldA, fieldB) Values(:fieldA, :fieldB)
insert into a (fieldA, fieldB) Values(?, ?)

然後定義一個字典物件,一併丟給cassandra的driver:session去執行,就可以如我們以前在sql常看到的參數binding一樣,去執行語法並新增

parameter = dict( fieldA=fieldA_Value, fieldBcreator=fieldB_Value)

然而若是user define type的話,如何解呢?list物件,應該就是對應到python的list,應該沒問題,那其他的欄位,有辦法對應下一層dictionary嗎? 試了老半天都是卡關…但是不確定是卡在動態參數binding那段,還是字典裡型別的問題 後來發現了關鍵的官網文件在此:https://datastax.github.io/python-driver/user_defined_types.html 擷圖如下 什麼!!竟然可以直接繼承object物件,然後定義一個初始化方法,指定所有user define type的名稱(帶入的型別由外面檢查) 就可以做到?!我一開始還想試另一條路(有空再說吧…),繼承cassandra usertype型別,使用它的orm方式來做,但是看起來用那個方法無法綁cql一起運作,他有他的獨立運作方式。

 

看了他的試範讓我馬上試著建立類別,並如上圖的方式,直接帶parameter定義的欄位,以我們訂單的例子:

class Address(object):
   def __init__(self, zip_code, nation, city, address1, address2): #欄位都要有,名稱都要正確
       self.zip_code=zip_code
       self.nation=nation
       self.city = city
       self.address1 = address1
       self.address2 = address2
       
class OrderItem(object):
    def __init__(self, id, name='', count=0, price=0, memo=None):  # 欄位都要有,名稱都要正確
        self.id = id
        self.name = name
        self.count = count
        self.price = price
        self.memo = memo
cql = """
insert Into "CustomerOrders" (customer_id, order_amount, order_items, shipping_address)
values(:customer_id, :order_amount, :order_items, :address);
"""

order_items = []
order_items.append(OrderItem(4, "game", 1, 1600, {"tag":"action game"}))
order_items.append(OrderItem(20, "toy", 2, 200, {"tag":"hand made"}))


parameter = dict(
    customer_id=1,
    order_amount=2000,
    order_items=order_items,
    address=Address(zip_code=970, nation="Taiwan", city="NewTaipeCity", address1="Test", address2=""))

session.execute(cql, parameter)

下指令的方式已經像使用sql一樣簡單,而透過上述user define type的定義與python的程式操作可以更靈活的使用cassandra這個強大nosql的特性囉。只是回頭看看自己…捚頭苦幹…這個議題n小時,真的有點吃力不討好,在此紀念我今天try error浪費的光陰,不如官方文件看仔細一點…

ps: Word Press有沒有什麼可以插入code可以用很好的樣式來呈現的外掛呀…改天來研究一下…(先試試這個pastacode…相容於php7)

Docker映像與資料備份與還原:以cassandra為例

Docker映像與資料備份與還原:以cassandra為例

今天開發的時候遇到一個情況,就是我的測試程式依賴了我本機起的一個nosql cassandra服務,是建立在docker上的

因此我把程式提供給同事以後,發現他沒有環境,因此程式功能無法展示,當然一部份也要展示cassandra的使用方式

 

一時間還不會docker的備份與移轉,因此早上就急著先把vmware的images(約略15GB)複製給同事,光複製的時間就花了快二十分鐘,當下覺得這種環境移轉的方式太過於笨重,想到若我還要再把我本機的另一個安裝hadoop的vm也要copy過去的話,那這樣來來回回的時間就不知耗費掉多少,而且我的私人開發環境其實是我自己常用的密碼,所以分享起來也是挺麻煩的。

 

有上述念頭後,我開始嘗試著在我兩台不同的vm(都是ubuntu)上面,試著把docker備份流程做一次

docker images備份(起手式:連結)

首先,我們需要先將目前的container狀態commit,先docker -ps列出所有container的id

我們很快的就看到dev-cassandra就是我們要備份的目標,接著輸入以下的指令:docker commit -p  {{container id}} {{repository-name}}

sudo docker commit -p  3b5 cassandra-devenv

接著回到docker images列表,我們就會看到我們的新commit的images已經被建立出來了(圖中第一個)

container已經被commit了以後,那我們要怎麼轉成實體檔案呢?

指令很簡單:

docker save -o ~/cassandra-devenv.tar cassandra-devenv

這個指令要跑一下子,跑完以後,我們可以看到tar檔已經產生,看起來要400mb左右,著實比vm動輒1x gb來得輕量許多

這裡有一個小陷阱,就是要先去chown給paul與chmod 給檔案合適的權限

 

備份docker的資料卷資料

images既然被備份出來以後,那我們的資料呢?

還記得dockers要怎麼看資料卷的現實綁定的位置呀?就要靠docker inspect 指令了

sudo docker inspect 3b5

看到container的環境資訊後,會發現幸好這個dockerfile有將資料獨立出來,在mount的字段下,我們可以看到/var/lib/cassandra的根目錄,被指派在我們本機(ubuntu)的/var/lib/docker/volumes/31a88f59c57b5a75b98f9c4e6a539e4140cb226760fc540b15b930d96010a84c/_data下。
(註:這邊不同系統的資料或設定目錄要視系統而定,不見得每個系統都有必要這樣做切割,但大部分有這樣切割設定的images佈署的彈性才會更好,也才會有更好使用率)

既然知道了cassandra相關檔案的實體位置,我們先將它備份壓縮起來吧。我已經可以想像等下要連同docker image的檔案與data要壓縮複製過去另一台主機。

先備份到個人目錄:

sudo cp -r /var/lib/docker/volumes/31a88f59c57b5a75b98f9c4e6a539e4140cb226760fc540b15b930d96010a84c /home/paul

記得要下-r,才會recursive的複製所有子目錄、檔案

這時/home/paul的目錄下就有了31a88f59c57b5a75b98f9c4e6a539e4140cb226760fc540b15b930d96010a84c

接著我們來進行打包成tar檔,tar的壓縮指令請參考:連結

tar -zcvf ./cassandra-data.tar ./31a88f59c57b5a75b98f9c4e6a539e4140cb226760fc540b15b930d96010a84c

建議可以使用加上-z的參數,代表,透過gzip的方式壓縮,若沒有-z或-j的話,是代表僅打包,不壓縮

ls -l看看壓完以後只剩下33MB,看起來效果不錯

 

既然都已經備份出來了,包含docker與資料的壓縮檔

那我們要怎麼從vm複製檔案到另一個vm呀?在我還沒學會scp指令前,我還真的只知道vm的linux開分享samba,然後透過windows去copy到另一台分享出來的目錄

以上動作,真的只需要1行指令就做的到了,那就是scp,我們先ssh到另一台目標移置的vm去!!

輸入scp指令看看使用方式:

usage: scp [-12346BCpqrv] [-c cipher] [-F ssh_config] [-i identity_file]
[-l limit] [-o ssh_option] [-P port] [-S program]
[[user@]host1:]file1 … [[user@]host2:]file2

scp是secure copy的簡寫,因此我們可以從首n台主機host 1 ~host n的檔案,複製到local來,以我們的例子來說,我們只要切到目標主機的指令目錄以後,照著以下輸入

同意建立ecdsa以後,接著輸入遠端的密碼,就會發現他開始copy了:

 scp [email protected]:/home/paul/cassandra-data.tar ./ [email protected]:/home/paul/cassandra-devenv.tar ./ 

restore docker的image

都搞定以後,我們輸入以下指令,就可以將這個備份load到這台vm的docker的images

sudo docker load -i ./cassandra-devenv.tar

不過怪異的是,不知有沒有參數可以指令輸入的repository name與tag,我load進來以後,預設是none。接著考慮本機的環境,我希望將cassandra的data目錄換一個位置:就到my/own/datadir吧,這邊沒有絕對名稱,只需要自己分的清楚,解壓縮以後,按照當初打包的相對位置先放置好:

tar -zxvf cassandra-data.tar

註:這邊我後來想想若在壓縮時,有透過-z的參數進行壓縮的話,建議命名就要是tar.gz,否則解壓端應該很難知道需不需要加上-z的參數,這邊因為都是我幹的,所以我當然知道要這樣搞。通通都解到目標目錄,接著看看目錄內容是不是都有了
(註:這邊我還省略了一些解壓目錄以後,再從目錄把檔案cp或mv到my/own/datadir的過程)

接著只要輸入神奇的docker run指令,就會發現

 sudo docker run –name cassandra-dev -p 9042:9042 -v /my/own/datadir:/var/lib/cassandra -d e86

檢查一下,sudo docker ps,已經正常跑起來了,這邊其實我有發現,cassandra的服務通常都要跑三分鐘左右,有時initialize掛掉都是發生在2~3分之間,通常超過3分鐘,應該就是起來了

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2cf14dc1d4d0 e86 “/docker-entrypoint.s” 6 hours ago Up 6 hours 7000-7001/tcp, 7199/tcp, 9160/tcp, 0.0.0.0:9042->9042/tcp cassandra-dev

 

實際連到192.168.65.130(剛剛是從136備份過來)的cassandra來看看囉!嗯~資料看起來都有被複製過來~暫時大功告成!特此記錄一番

 

後記:每次要寫這種過程重現真的是很耗費時間(也整理了差不多1個多小時),我發現,最佳的記錄方式,其實就是當我們在試著學習與除錯的時候(若有那個餘裕啦)

就定期把相關的畫面與做過的事情,記錄到evernote或其他方便的地方,事後的整理事實上真的事半功倍…若下次有機會遇到的話…希望有機會想到…

 

autopep8 – pep8 auto styling tool 如何整合到Pycharm

autopep8 – pep8 auto styling tool 如何整合到Pycharm

身為一個半路出家的python幼幼班,常常為了轉換語言,而在命名、排版上與直覺發生出入

python有一個python pep8 style(相關介紹)

為了盡量符合pep8的style,這次我先決定套用工具幫我自動排版,希望等到這種排版看久了,自然也會內化吧orz

首先在cmd上輸入:(註:請先使用系統管理員執行cmd,否則會權限不足)

pip install autopep8

 

接著開啟pycharm,點選上方的導覽列file->settings 切換到tools / external tools的頁籤

接著新增一個external tools,參考以下輸入:

Name, Description都可以依自己喜好設定,tool settings這邊請照以下輸入

program: autopep8
parameters: –in-place –aggressive –aggressive $FilePath$ (–in-place代表是直接對該檔案調整,依測試是不可逆的)
working directory: $ProjectFileDir$

以上希望知道參數說明,可以自己打autopep8 -h 研讀


 

其中output filters來新增一組regular exp

範例:$FILE_PATH$\:$LINE$\:$COLUMN$\:.*

以上都設定好以後,我們在程式原始檔上,右鍵:external tools,這時會發現有autopep8的選項可以點

autopep8 –in-place –aggressive –aggressive C:\Users\paul\Desktop\WorkFolder\collection_proxy\data_transfer.py

 

我可以看到console其實他背後就是對指定檔案下以上指令,因此自己也可以這樣去下或批次調整,若希望不要改到原始檔案的話,請記得拿掉–in-place指令

想自行輸出的話,可以參考類似以下語法

autopep8 –aggressive –aggressive C:\Users\paul\Desktop\WorkFolder\meso_collection\meso_collection_proxy\data_model\exchange\exchange_data.py > ./exchange_data_new.py

Before

After

其實格式化後,至少我離python的常用排版更貼近了一些,至於程式可讀性這種東西,我覺得還是見仁見智,必竟程式碼閱讀主要是團隊裡的人看的懂,後續接手維護的人員看的懂最重要,因此程式最重要的還是首重命名,再來排版只要順眼,當然團隊中每個人都可能有自己順眼的方式,但要要找到一個共識,其實沒有絕對的標準答案的