分類: 我的測試食譜

壓力測試到底在關注什麼?

壓力測試到底在關注什麼?

最近請同事開始使用工具做壓力測試,一開始講目的是為了”知道”我們的API能否承受一定的使用者使用我們的系統,至於多少使用者呢?假定是1000個線上用戶為目標。但過了一陣子,發現大家似乎 對於要測什麼,怎麼測,測完後的行動,壓力測試跟效能測試差異,都還有一些疑慮。導致無法有效的進行。

先講”相對”最簡單的議題,怎麼測,首先應該是依賴工具,有各種技術方案,有的用JMeter,有的用Script的Tool例如K6。端看想要如何測試往下延續或保持這種壓測的節奏。

JMeter的考量可能是有GUI的壓測劇情設定編排的畫面,對於比較不寫程式的,也有機會去達成壓測的工作。一樣可以做到算是API間的相依,像是從Login拿Token,再拿Token去打 其他API做驗證後的邏輯。這些都做的到,報表也有整合好可以看到一些圖表數據。這些數據我就先不討論了,都是有定義的。

其他工具例如K6,就可以讓你用JavaScript的方式寫腳本,方便用程式的方式 自動化 的,重覆使用化的去設計你的壓測流程,這是好處,不過就會有一點點門檻。(這對程式有點基礎的就學習上相對簡易很多)

再來一層就是我們要討論 為什麼要壓力測試,壓力測試的對象從不同的面象又可以做一個粗淺的分類,例如:

  1. front-end 或是 back-end
  2. 元件或是 end 2 end 測試
  3. 單一協定架構或混合架構。

上述分類不代表什麼,只是說明你可能可以整體來看你測試對象的分類為何 ,因為能錨定你的對象跟你選擇的工具,腳本的寫法都有些關係。不過我認為,這些還是可以概念上分成由上而下拆解,還是由下而上堆疊來驗證。

由上而下,不外乎還是把你的測試對象視為一個黑盒子。可能你知道中間的構成是什麼,也可能不知道或不確定,因此透過 測試,可以幫助你推論 瓶頸在何處,但都要數據來證明。愈複雜的組成有可能有愈多的依賴。因此你一開始愈來,拆的成本就愈高。

由下而上,我認為是你可以透過很多碎片/單元化的可驗證組件逐一測試。這些分開的成本也不小。疊到符合”使用場景”要花的時間,是你需要考慮的。例如假如知道一隻極度單純的API查詢 資料庫1次的數據(先知道環境依賴的跟與業務邏輯無關的部分),其實你可以往下驗證到,例如你的登入,背後可能是1隻API結合查詢 3次 ,寫入1次 ,這樣的子的邏輯複雜度。在你設定的壓力狀況下的成績如何。再來逐步的疊加可能登入Api+查詢Api複合性的商業邏輯,不過注意必須合理,若網站所有Api不考慮相關性,相依性 平均的去壓測他們,這樣的測試也是既耗時也浪費的。

因此從上述的方向,並不是說你只能走一個方向,而是你要怎麼透過”不同的角度”去找答案!

以API的部分,舉個例子:

一開始 可能設想由上而下,我們現在電商網站最常被使用是登入嗎?可能不是,大家要嘛就已經登入了,user資訊存在cookie中 ,要嘛大家就在逛網站,而不用急著登入。那麼登入真的是最需要被驗證的接口嗎?其實可能不是。這些都端看你網站內的流程如何設計。那往對應到流程後,背後有哪些”最相關的Api分析出來後,就可能是你首要的驗證對象。

因此 使用者場景=分析=>關鍵路徑(常用不一定關鍵)=分解=>背後關鍵的API順序。這個過程去識別出測試對象,就是很重要的。

大則流程,細則程式,你也可以想象,這是一個識別風險的過程,並透過科學的方式證明他們是或不是風險,每次開發好一個API功能後,可能先單獨 驗證這隻API的效能,再接著從這次 新增的Api去往外推論影響的範圍。

最後,聊一下由下而上的概念,這邊舉個例子,就是測試參數到底怎麼決定比較好,看了k6的vus跟duration還有iteration後,我發現測試上會跟現實狀況會有脫勾的情況,例如你要測 2000 concurrent user,你的想像是一秒內同時打 2000個請求,都能被消耗,並能被快速的執行完成,是這樣嗎?往往 使用者情境不見得是這樣,就算是搶票,我們還是可以區分出讀寫比重不一定一樣的行為模式,因此結合使用者情境或是能透過你的情境去解釋,就會各自有差別,不過我想說的以本機測試為例,我們其實是希望知道受測單元的極限。例如1個直接回傳 200的api接口,在一台PC等級的筆電上,能驗證的vus同時併發量就是440vus(因為超過可能就會”環境”failed,是server被reset connection的那種),然後平均每秒 處理請求數是多少,這些都可以先針對你的技術框架有一個單元極限值,然後再往 疊加業務邏輯的方式,去收斂那個參數邊界,最後找到一個合理 測試參數,接著就是固定這個變數,往再去尋找其他可控 變數,例如組合不同邏輯下,對效能、可用性的影響為何 。

知道作法跟想做的方向後,再來看我們要怎麼解讀壓力測試API的結果來回答我們的疑問,現學現賣一下,不外乎想知道兩個面向

  1. Latency(延遲)
  2. Availability(可用性)

從Latency 延遲角度,可以結合Performance的角度就是為了知道有壓力源下的回應時間的分佈狀況。例如你可以想像在早上8:00的時候,使用系統的人最多,可能有1000人,平均等待時間會從100人的平均3秒成長到平均10秒 ,而這樣的延遲是否可以”忍受”。若不能忍受就要提高服務器的性能或是做其他的分流機制。

從Availability可用性的角度,主要是想知道API是否會中止服務,而導致關鍵流程無法完成。因此我認為就是在看 Total跟Failed的比例。假如Fail是3次 ,但是是100萬次的3次,跟10次 裡的3次 ,狀況可能完全不一樣。(但這不代表 100萬的3次 不重要)

可以想像一個情境,假如你有一個搶票的API,公告12:00開始可以進來搶100張票,結果假如你只有一台主機服務,結果因為開搶那一瞬間,湧進來10000個人來搶。瞬間把API打到當機,背後的資料庫也阻塞中止新連線,結果整個網站不要說進來了,連前面有成功進來的人也無法往下走到”金流”的流程,那麼就是典型的可用性出了很大的問題。

而這些都是可以”提前”大膽的假設,透過壓測工具來求證。那麼如何使用工具來驗證場景呢?K6官網有整理了幾種壓測的情境,來因應上述的場景
1.Smoke Test
2.Average Test
3.Stress Test
4.Spike Test
5.Soak Test

其中或許我們只要分辨出,SmokeTest目的是用小單位去驗收你的測試腳本是否正確,先行排除在重點之外。

而Average, Stress, Spike都是在給定一個壓力源(這個也是要設計的)下 ,壓力可能是量,可能是時間長(Soak),到底回應時間以及可用性如何。

上面先啦賽了這麼多,就是希望有助於結構性的先理解我們面對的東西是什麼,概念跟術語是什麼,先做好定義同步共識後,往下做的Action才有理有據,而每一次做的壓測都有如一場實驗,留下的數據,就算是沒有瓶頸的發現,那保留下來的可能那組”安全”參數、數據上的經驗都應該要能夠被延續,而不是把每個壓測都視為那麼單一的事件去操作。每個產品,每個階段,每個功能都不盡相同,但分析與解決問題的脈落其實都有一套科學思維的pattern,數據也有解讀的方式。解讀出來後,如何往下分析瓶頸(依賴)跟重構,那屬於壓測後的獨立工作,就先不在這邊一併討論了。

實作RSA + AES 的檔案加密算法POC

實作RSA + AES 的檔案加密算法POC

為了做到檔案的加密(後來才知道其實用GPG就可以了)

一開始我只是想說用現成套件不同的 金鑰key值作為參數,結合ChatGpt就開始做POC看看了

大不了是換演算法的問題,結果其實數位簽章 除了格式、作業系統管理的模式、公私鑰檔案其實也有不同的規定

但大至上心得是一開始想說讓 檔案加密,而且是透過Public Key進行,然後對方用 Private Key解密,所以我出發點就是用RSA演算法去進行加密,結果發現怎麼都會發生長度錯誤的問題。

後來深究後才發現RSA的加密無法對長度太大的東西進行,所以後來換成短字串果然就可以了

因此思考了加解密該做的事情後,就做了一個AES + RSA的POC,RSA主要用來加密 AES的Key

後來果然發現這個跟GPG的作法有點像~~

實作POC如下∶
using System.Security.Cryptography;

string keyBase64 = string.Empty;
using (Aes aesAlgorithm = Aes.Create())
{
aesAlgorithm.KeySize = 256;
aesAlgorithm.GenerateKey();
keyBase64 = Convert.ToBase64String(aesAlgorithm.Key);
Console.WriteLine($”Aes Key Size : {aesAlgorithm.KeySize}”);
Console.WriteLine(“Here is the Aes key in Base64:”);
Console.WriteLine(keyBase64);
Console.ReadKey();
}

string filePath = “/Users/paul_huang/Desktop/FileEncryptTest”;
GenerateKeys(filePath);

// 設定公鑰和私鑰檔案路徑
string publicKeyPath = Path.Combine(filePath, “publicKey.pem”);
string privateKeyPath = Path.Combine(filePath, “privateKey.pem”);

// 設定金鑰的位元數
var keyString = keyBase64;

// 檢查檔案是否存在
if (!File.Exists(publicKeyPath) || !File.Exists(privateKeyPath))
{
Console.WriteLine(“找不到金鑰檔案。請確保金鑰檔案存在。”);
return;
}

// 讀取公鑰
string publicKey = File.ReadAllText(publicKeyPath);

// 讀取私鑰
string privateKey = File.ReadAllText(privateKeyPath);

// 設定要加密的檔案路徑

// 加密檔案
var keyFileName = “key.txt”;
var keyFilePath = Path.Combine(filePath, keyFileName );
if (File.Exists(keyFilePath) == false)
{
File.WriteAllText(keyFilePath, keyString);
}
else
{
keyString = File.ReadAllText(keyFilePath);
}

EncryptKeyFile(keyFilePath, publicKey);

DecryptKeyFile($”{keyFilePath}.encrypted”, privateKey);


// var encryptedKey = $”{keyFilePath}.encrypted”;

var encryptFile = “paul.jpg”;
var encryptFilePath = Path.Combine(filePath, encryptFile);


var encryptedFileContentString = Encrypt(Convert.ToBase64String(File.ReadAllBytes(encryptFilePath)), keyString);
File.WriteAllText(encryptFilePath+”.encrypted”, encryptedFileContentString);

File.WriteAllBytes(encryptFilePath + “decrypted”, Convert.FromBase64String(Decrypt(encryptedFileContentString, keyString)));

static void EncryptKeyFile(string filePath, string publicKey)
{
try
{
byte[] dataToEncrypt = File.ReadAllBytes(filePath);

using var rsa = new RSACryptoServiceProvider(2048);
rsa.FromXmlString(publicKey);

byte[] encryptedData = rsa.Encrypt(dataToEncrypt, false);

// 寫入加密後的檔案
File.WriteAllBytes($”{filePath}.encrypted”, encryptedData);

Console.WriteLine(“檔案已成功加密。”);
}
catch (Exception ex)
{
Console.WriteLine($”加密時發生錯誤: {ex.Message}”);
}
}

static void DecryptKeyFile(string encryptedFilePath, string privateKey)
{
try
{
byte[] encryptedData = File.ReadAllBytes(encryptedFilePath);

using (RSACryptoServiceProvider rsa = new RSACryptoServiceProvider(2048))
{
rsa.FromXmlString(privateKey);

byte[] decryptedData = rsa.Decrypt(encryptedData, false);

// 寫入解密後的檔案
File.WriteAllBytes($”{encryptedFilePath}.decrypted”, decryptedData);

Console.WriteLine(“檔案已成功解密。”);
}
}
catch (Exception ex)
{
Console.WriteLine($”解密時發生錯誤: {ex.Message}”);
}
}

static string Encrypt(string plainText, string key)
{
using (Aes aesAlg = Aes.Create())
{
aesAlg.KeySize = 256; // Set key size to 256 bits
aesAlg.Key = Convert.FromBase64String(key);
aesAlg.IV = new byte[16]; // AES uses a 128-bit IV

ICryptoTransform encryptor = aesAlg.CreateEncryptor(aesAlg.Key, aesAlg.IV);

using (MemoryStream msEncrypt = new MemoryStream())
{
using (CryptoStream csEncrypt = new CryptoStream(msEncrypt, encryptor, CryptoStreamMode.Write))
{
using (StreamWriter swEncrypt = new StreamWriter(csEncrypt))
{
swEncrypt.Write(plainText);
}
}

return Convert.ToBase64String(msEncrypt.ToArray());
}
}
}

static string Decrypt(string cipherText, string key)
{
using (Aes aesAlg = Aes.Create())
{
aesAlg.KeySize = 256; // Set key size to 256 bits
aesAlg.Key = Convert.FromBase64String(key);
aesAlg.IV = new byte[16];

ICryptoTransform decryptor = aesAlg.CreateDecryptor(aesAlg.Key, aesAlg.IV);

using (MemoryStream msDecrypt = new MemoryStream(Convert.FromBase64String(cipherText)))
{
using (CryptoStream csDecrypt = new CryptoStream(msDecrypt, decryptor, CryptoStreamMode.Read))
{
using (StreamReader srDecrypt = new StreamReader(csDecrypt))
{
return srDecrypt.ReadToEnd();
}
}
}
}
}

static void GenerateKeys(string path)
{
try
{
using (RSACryptoServiceProvider rsa = new RSACryptoServiceProvider(2048))
{
// 公鑰
string publicKey = rsa.ToXmlString(false);
Console.WriteLine(“公鑰:”);
Console.WriteLine(publicKey);

// 私鑰
string privateKey = rsa.ToXmlString(true);
Console.WriteLine(“\n私鑰:”);
Console.WriteLine(privateKey);

// 將金鑰寫入檔案
File.WriteAllText(Path.Combine(path, “publicKey.pem”), publicKey);
File.WriteAllText(Path.Combine(path, “privateKey.pem”), privateKey);

Console.WriteLine(“\n金鑰已成功生成並保存至檔案。”);
}
}
catch (Exception ex)
{
Console.WriteLine($”生成金鑰時發生錯誤: {ex.Message}”);
}
}

static string GenerateRandomKey(int length)
{
// 使用 RNGCryptoServiceProvider 產生安全的亂數
using (var rng = new RNGCryptoServiceProvider())
{
// 建立位元組陣列來存放隨機資料
byte[] randomBytes = new byte[length];

// 將亂數填入位元組陣列
rng.GetBytes(randomBytes);

// 將位元組轉換成十六進位字串
string randomKey = BitConverter.ToString(randomBytes).Replace(“-“, “”);

return randomKey;
}
}

壓測工具的新選擇!! Locust load test framework

壓測工具的新選擇!! Locust load test framework

繼上一篇,自建nginx分流後(傳送門),接著的問題就是要如何驗證分流效果與實際上這樣架構的負載吞吐量到底能到多少了。傳統想到壓測,我們就會想到jmeter,但是jmeter的參數配置繁多,而且若要做到可程式化的話其路可能相當曲折;再者,若在我們實體主機上測試的話,極有可能受限於其物理極限,包含頻寬與網卡的能力,因此我搜尋了一下python有沒有load test framework,又可以同時支援分散式的load test

 

結果發現了這套名叫:locust的套件!!其安裝方式相當的簡單:

首先,整份框架都是python寫的,透過pip就可以安裝,因此先來試著打包成映像檔,變成隨時可以使用的容器吧:

測試Script寫起來相當簡潔,我們直接存成locustfile.py,實作HttpLocust、TaskSet後,屆時會在locust的壓測ui介面,讀取到接口

from locust import HttpLocust, TaskSet, task


class WebTest(TaskSet):

    @task
    def get_uuid(self):
        self.client.get("/webtest?action=get_uuid")


class WebsiteTest(HttpLocust):
    task_set = WebTest

Dockerfile的內容:

FROM python:3.6-alpine

COPY docker-entrypoint.sh /

RUN    apk --no-cache add --virtual=.build-dep build-base \
    && apk --no-cache add libzmq \
    && pip install --no-cache-dir locustio==0.8.1 \
    && apk del .build-dep \
    && chmod +x /docker-entrypoint.sh

RUN  mkdir /locust
WORKDIR /locust
EXPOSE 8089 5557 5558

ENTRYPOINT ["/docker-entrypoint.sh"]

docker-entrypoint.sh的內容:

#!/bin/sh
set -e
LOCUST_MODE=${LOCUST_MODE:-standalone}
LOCUST_MASTER_BIND_PORT=${LOCUST_MASTER_BIND_PORT:-5557}
LOCUST_FILE=${LOCUST_FILE:-locustfile.py}

if [ -z ${ATTACKED_HOST+x} ] ; then
    echo "You need to set the URL of the host to be tested (ATTACKED_HOST)."
    exit 1
fi

LOCUST_OPTS="-f ${LOCUST_FILE} --host=${ATTACKED_HOST} --no-reset-stats $LOCUST_OPTS"

case `echo ${LOCUST_MODE} | tr 'a-z' 'A-Z'` in
"MASTER")
    LOCUST_OPTS="--master --master-bind-port=${LOCUST_MASTER_BIND_PORT} $LOCUST_OPTS"
    ;;

"SLAVE")
    LOCUST_OPTS="--slave --master-host=${LOCUST_MASTER} --master-port=${LOCUST_MASTER_BIND_PORT} $LOCUST_OPTS"
    if [ -z ${LOCUST_MASTER+x} ] ; then
        echo "You need to set LOCUST_MASTER."
        exit 1
    fi
    ;;
esac

cd /locust
locust ${LOCUST_OPTS}

單一台load test測試端 啟動指令 (驗證ok):

sudo docker run --name locust-master --hostname locust-master \
--network="webtest" \
-p 8089:8089 -p 5557:5557 -p 5558:5558 \
-v /home/paul/webtest/locust:/locust \
-e ATTACKED_HOST='http://web-test-nginx:10000' \
grubykarol/locust

分散式的測試端配置(待驗證):

啟動指令-master測試端:

docker run --name master --hostname master `
 -p 8089:8089 -p 5557:5557 -p 5558:5558 `
 -v c:\locust-scripts:/locust `
 -e ATTACKED_HOST='http://master:8089' `
 -e LOCUST_MODE=master `
 --rm -d grubykarol/locust

啟動指令-slave測試端:

docker run --name slave0 `
 --link master --env NO_PROXY=master `
 -v c:\locust-scripts:/locust `
 -e ATTACKED_HOST=http://master:8089 `
 -e LOCUST_MODE=slave `
 -e LOCUST_MASTER=master `
 --rm -d grubykarol/locust
docker run --name slave1 `
 --link master --env NO_PROXY=master `
 -v c:\locust-scripts:/locust `
 -e ATTACKED_HOST=http://master:8089 `
 -e LOCUST_MODE=slave `
 -e LOCUST_MASTER=master `
 --rm -d grubykarol/locust

 

容器執行後,打入http://xxx.xxx.xxx.xxx:8090,就可以連到對應的監控網址(這個是最棒的),一開始就會問你模擬的u數,以及你希望多久時間內要衝到該u數

摘要總表:

即時圖表:

錯誤分析:

 

同時,我去驗證了我們mongodb的log,是符合他load test的請求數量

 

以上是這次接觸到新的壓測工具實作的小小記錄,也推薦給大伙

 

參考:

https://locust.io/

https://docs.locust.io/en/stable/

https://medium.com/locust-io-experiments/locust-io-experiments-running-in-docker-cae3c7f9386e

 

 

 

Testing and Deployment 概念圖

Testing and Deployment 概念圖

測試與佈署-自動化

jenkins是另一個主要課題,另一部分,就是環境配置檔的管理…

先前公司的做法是有一個環境配置資料庫,可以讓人員透過介面調整key/value的參數值

當jenkins deploy的時候,會自動replace掉程式中固定的參數(聽說好像是全面性,連程式敘述都會被置換掉)。

這一塊也還沒有想法,目前應該只有在跟git整合上而己,先讓程式可以boxing到docker container就好

若有緣人看到這個架構認為有什麼疑惑或是想法的話,歡迎來噹

Unittest Python – My First Mock Lesson!!How to Mock 3rd party Component!!

Unittest Python – My First Mock Lesson!!How to Mock 3rd party Component!!

過往,我們在寫程式中,常常會去呼叫到參考的函式庫,或是請求外部的服務

這導致在單元測試中,我們會因為不具備其環境或是資源,而陷入難以測試的場景

若以整個系統來看,模組之間也可以說是相對的是一個”單元”吧,最常相依的就是資料!

這種情況,通常我們有的時間的話,可以大費周章的建立數據庫、測試檔案與環境,測試前後準備資料,刪除資料流程。

不過這往往是整測時才會遇到的場景,這邊先不討論。

 

我希望的單元測試是集中著重在自已這端的程式運作是否符合預期!!

管他外部環境怎樣,網路有沒有通、世界是否毀滅都與我暫時無關

mock的技巧就是來拯救我們上述說到的難題與困境,mock是用來模擬相依的外部服務或物件的物件。

也就是我們透過mock來將所有程式外部的相依性解除吧!!這麼說很玄

在python的unittest的函式庫中已有包含

 

這邊直接演示(記錄)程式使用的方式,這個例子有點簡單,但是幾乎完全相依!!

from pywebhdfs.webhdfs import PyWebHdfsClient

class WebHDFSHelper:
    __hdfs_client = None

    def __init__(self, host='', port='', user=''):
        self.__hdfs_client = PyWebHdfsClient(host=host, port=port,
                                             user_name=user)

    def create_dir(self, full_dir_path):
        try:
            #before - do something
            result = self.__hdfs_client.make_dir(full_dir_path)
            #after - do something
        except BaseException:
            #exception - do something
            raise
        return result
	
def test_create_dir_temp(self):
    helper = WebHDFSHelper(host="192.168.65.130", port=50070, user="root")
    result = helper.create_dir( "exchange_test" )
    self.assertEqual(result, True)
    pass

當我們要測試建立目錄這個行為時,我們往往會遇到困難就是我必須去真實連到一個hdfs的環境!!

通常可能開發者本機都會有,或者確保團隊有一些共同的測試環境

但這種相依性會導致,若程式移轉到其他人手上或是環境一變動

測試就會無法運作!

例如:ip變了!

 

這次我們來透過mock的技巧來演示如何將如何模擬這些hdfs服務

@mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.__init__')
@mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.make_dir')
def test_create_dir(self, mock_hdfs_client ,mock_init_service):
    mock_init_service.return_value = None
    helper = WebHDFSHelper(host="localhost", port=50070, user="root")
    mock_init_service.assert_called_with(host="localhost", port=50070, user_name="root")


    helper.create_dir( DATA_TEMP_PATH )
    mock_hdfs_client.assert_called_with(DATA_TEMP_PATH)
    pass

首先,我們要先模擬的就是

第一塊WebHDFSHelper下面的初始函式會去依照參數連線到外部的hdfs服務

self.__hdfs_client = PyWebHdfsClient(host=host, port=port,user_name=user)

第二塊是呼叫create_dir的時候的這一行會去呼叫參考物件的make_dir的方法,這個會相依上面那塊初始化後的物件。

result = self.__hdfs_client.make_dir(full_dir_path)

 

所以我們知道這兩塊是我們能獨立單元測試的排除目標

因此我們對應的宣告

@mock.patch(‘pywebhdfs.webhdfs.PyWebHdfsClient.__init__’)
@mock.patch(‘pywebhdfs.webhdfs.PyWebHdfsClient.make_dir’)

並對我們的test_測試函式,注入2組參數

def test_create_dir(self, mock_hdfs_client ,mock_init_service):

前者是我們到時要呼叫make_dir的方法,後者是我們初始化時要mock的行為。

註:這邊mock的patch宣告所對test函式注入的順序為何是倒序,我則不知道

 

接著依我們的場景,mock物件可以做幾件事

1.指定回傳物件,因為是模擬,就可以一併模擬回傳的結果

mock_init_service.return_value = None #若要模擬初始函式,就是回傳值一定是none

2.透過mock物件來確認是否有被呼叫到,連同參數一併帶入,注意參數名稱必須符合原始模擬物件的參數名稱)

mock_init_service.assert_called_with(host=”localhost”, port=50070, user_name=”root”)

 

因此測試如下:

class HDFSHelperTest(unittest.TestCase): 
    @mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.__init__') 
    @mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.make_dir') 
    def test_create_dir(self, mock_hdfs_client ,mock_init_service): 
        mock_init_service.return_value = None 
        helper = WebHDFSHelper(host="localhost", port=50070, user="root") 
        mock_init_service.assert_called_with(host="localhost", port=50070, user_name="root") 
        helper.create_dir( DATA_TEMP_PATH ) 
        mock_hdfs_client.assert_called_with(DATA_TEMP_PATH)

我們設個DEBUG中斷點看看,在初始化後,我們的內部變數已經被轉換成一個含有magicmock行為make_dir的物件。

因為初始化這邊沒有其他需要驗證的流程,因此我只去assert是否有照著我給的參數呼叫,這時管他這個環境是不是活著,我只需要知道”若”我給活的環境 就會正常。 若需要驗證例外行為,以這個例子,可以加入更多的例外處理。 這時一樣可以透過mock的return_value raise exception,這是做的到的,然而驗證的東西還是要自行定義,在一般函式環境的話,你可以因為例外給定預設行為,這樣也是有助於提高程式測試的覆蓋率。 例如:

mock_init_service.return_value = None
mock_init_service.side_effect = Exception("環境異常")
helper = WebHDFSHelper( host="localhost", port=50070, user="root" )
mock_init_service.assert_called_with( host="localhost", port=50070, user_name="root" )


如願拋出異常 最後,我們實際測試create_dir吧,第1段,我將他設定為mock_hdfs_client.return_value = True,第2段設定為False

確實會如願的走向1次成功、1次失敗,當然我這邊程式只為了驗證是否如配置去跑,所以只有這樣寫,一般成功、失敗處理上的邏輯差別還是會依原本物件設計的流程來運作 因此要「驗證」的東西與結果,仍然可以依自己的需求去調整與配置。 以上就是python的mock初體驗,以自己的例子還是比較清楚mock技巧的使用場景與方式…有錯的部分請不吝指教,感謝

 

測試程式完整版

from pywebhdfs.webhdfs import PyWebHdfsClient
class WebHDFSHelper:
    def __init__(self, data_path='', host='', port='', user=''):
        try:
            self.__hdfs_client = PyWebHdfsClient(host=host, port=port,
                                                 user_name=user)
        except Exception as err:
            print(str(err))
    
        def create_dir(self, full_dir_path):
            try:
                result = self.__hdfs_client.make_dir(full_dir_path)
                if result:
                    print("成功")
                else:
                    print("失敗")
            except:
                raise
            return result


class HDFSHelperTest(unittest.TestCase):
    @mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.__init__')
    @mock.patch('pywebhdfs.webhdfs.PyWebHdfsClient.make_dir')
    def test_create_dir(self, mock_hdfs_client, mock_init_service):
        mock_init_service.return_value = None

        mock_init_service.side_effect = Exception("環境異常")
        helper = WebHDFSHelper(host="localhost", port=50070, user="root")
        mock_init_service.assert_called_with(
            host="localhost", port=50070, user_name="root")

        mock_init_service.return_value = None
        mock_init_service.side_effect = None
        helper = WebHDFSHelper(host="localhost", port=50070, user="root")
        mock_init_service.assert_called_with(
            host="localhost", port=50070, user_name="root")
    
        mock_hdfs_client.return_value = True
        helper.create_dir(DATA_TEMP_PATH)
        mock_hdfs_client.assert_called_with(DATA_TEMP_PATH)
    
        mock_hdfs_client.return_value = False
        helper.create_dir(DATA_TEMP_PATH)
        mock_hdfs_client.assert_called_with(DATA_TEMP_PATH)
    pass