构建批量异步Bird 收件人验证工具

构建批量异步Bird 收件人验证工具

构建批量异步Bird 收件人验证工具

May 26, 2022

出版商

出版商

扎卡里-萨缪尔斯

扎卡里-萨缪尔斯

-

类别

类别

电子邮件

电子邮件

Ready to see Bird
in action?

Ready to see Bird
in action?

Building a Bulk Asynchronous Bird Recipient Validation Tool

One of the questions we occasionally receive is, how can I bulk validate email lists with 收件人验证? There are two options here, one is to upload a file through the 兴业银行 UI for validation, and the other is to make individual calls per email 到 API (as the API is single email validation).


第一个选项效果很好,但有 20Mb 的限制(约 500,000 个地址)。如果有人的电子邮件列表包含数百万个地址怎么办?这可能意味着要将其分割成 1,000 个 CSV 文件上传。


Since uploading thousands of CSV files seems a little far-fetched, I took that use case and began to wonder how fast I could get the API to run. In this blog post, I will explain what I tried and how I eventually came to a program that could get around 55 秒内完成 100,000 次验证 (Whereas in the UI I got around 100,000 validations in 1 minute 10 seconds). And while this still would take about 100 hours to get done with about 654 million validations, this script can run in the background saving significant time.


该程序的最终版本可在以下网站找到 here.


我的第一个错误:使用 Python

Python 是我最喜欢的编程语言之一。它在很多领域都很出色,而且非常简单直接。然而,有一个领域它并不擅长,那就是并发进程。虽然 Python 能够运行异步函数,但它有一个被称为 Python 全局解释器锁或 GIL 的功能。


"Python 全局解释器锁或 GIL,简单地说,是一个只允许一个线程控制 Python 解释器的互斥(或锁)。


这意味着,在任何时间点,只能有一个线程处于执行状态。对于执行单线程程序的开发人员来说,GIL 的影响并不明显,但它可能成为 CPU 绑定和多线程代码的性能瓶颈。


Since the GIL allows only one thread to execute at a time even in a multi-threaded architecture with more than one CPU core, the GIL has gained a reputation as an “infamous” feature of Python.” (https://realpython.com/python-gil/)”


起初,我并不知道 GIL 的存在,于是开始用 Python 编程。最后,尽管我的程序是异步的,但它还是被锁住了,无论我添加多少个线程,每秒也只能迭代 12-15 次。


Python 异步函数的主要部分如下所示:

async def validateRecipients(f, fh, apiKey, snooze, count): h = {'Authorization': apiKey, 'Accept': 'application/json'} with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq,headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)

 

因此,我放弃了使用 Python,回到了绘图板上...


由于 NodeJS 能够很好地执行非阻塞 i/o 操作,因此我决定使用 NodeJS。我对 NodeJS 的编程也相当熟悉。


Utilizing asynchronous aspects of NodeJS, this ended up working well. For more details about asynchronous programming in NodeJS, see https://blog.risingstack.com/node-hero-async-programming-in-node-js/


第二个错误:试图将文件读入内存

我最初的想法如下:



首先,输入一个 CSV 电子邮件列表。其次,将电子邮件加载到数组中,并检查它们的格式是否正确。第三,异步调用收件人验证 API。第四,等待结果并将其加载到一个变量中。最后,将该变量输出到 CSV 文件。


This worked very well for smaller files. ǞǞǞ issue became when I tried to run 100,000 emails through. ǞǞǞ program stalled at around 12,000 validations. With the help of one of our front-end developers, I saw that the issue was with loading all the results into a variable (and therefore running out of memory quickly). If you would like to see the first iteration of this program, I have linked it here: 第 1 版(不推荐).



首先,导入 CSV 电子邮件列表。其次,计算文件中的电子邮件数量,以便进行报告。第三,在异步读取每一行时,调用收件人验证 API 并将结果输出到 CSV 文件。


因此,每读入一行,我都会调用 API 并异步写出结果,这样就不会在长期内存中保留任何数据。在与收件人验证团队沟通后,我还删除了电子邮件语法检查,因为他们告诉我,收件人验证已经内置了检查电子邮件是否有效的检查功能。


分解最终代码

读入并验证终端参数后,我运行以下代码。首先,我读入 CSV 文件中的邮件,并逐行计数。这个函数有两个目的:1)它允许我准确报告文件进度(稍后我们将看到);2)它允许我在文件中的邮件数等于已完成验证的邮件数时停止计时器。我添加了计时器,这样就可以运行基准,确保获得良好的结果。


let count = 0; //Line count require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) //Reads the infile and increases the count for each line .on("close", function () { //At the end of the infile, after all lines have been counted, run the recipient validation function validateRecipients.validateRecipients(count, myArgs); });

 

然后,我调用 validateRecipients 函数。请注意,该函数是异步的。验证 infile 和 outfile 均为 CSV 后,我写了一行标题,并使用 JSDOM 库启动了程序定时器。


async function validateRecipients(email_count, myArgs) { if ( //If both the infile and outfile are in .csv format extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; //Counter for each API call email_count++; //Line counter returns #lines - 1, this is done to correct the number of lines //Start a timer const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); //Outfile output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); //Write the headers in the outfile

 

下面的脚本是整个程序的主要部分,因此我将把它拆开并解释发生了什么。对于 infile 的每一行


fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }) //For each row read in from the infile, call the SparkPost Recipient Validation API

 

然后,在回复中

  • 在 JSON 中添加电子邮件(以便在 CSV 中打印出电子邮件)

  • 验证原因是否为空,如果为空,则填充一个空值(这样 CSV 格式才会一致,因为在某些情况下,原因会在回复中给出)。

  • 设置 json2csv 模块的选项和密钥。

  • 将 JSON 转换为 CSV 并输出(利用 json2csv)

  • 在终端中写入进度

  • 最后,如果文件中的邮件数 = 验证完成数,则停止计时器并打印出结果


.then(function (response) { response.data.results.email = String(email); //Adds the email as a value/key pair 到 response JSON to be used for output response.data.results.reason ? null : (response.data.results.reason = ""); //If reason is null, set it to blank so the CSV is uniform //Utilizes json-2-csv to convert the JSON to CSV format and output let options = { prependHeader: false, //Disables JSON values from being added as header rows for every line keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], //Sets the order of keys }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; //Increase the API counter process.stdout.write(`Done with ${completed} / ${email_count}\r`); //Output status of Completed / Total to the console without showing new lines //If all emails have completed validation if (completed == email_count) { const stop = window.performance.now(); //Stop the timer console.log( `All emails successfully validated in ${ (stop - start) / 1000 } seconds` ); } })

 

我发现的最后一个问题是,虽然这在 Mac 上运行得很好,但在 Windows 上进行了大约 10,000 次验证后,我遇到了以下错误:


Error: connect ENOBUFS XX.XX.XXX.XXX:443 - Local (undefined:undefined) with email XXXXXXX@XXXXXXXXXX.XXX


After doing some further research, it appears to be an issue with the NodeJS HTTP client connection pool not reusing connections. I found this Stackoverflow 文章 on the issue, and after further digging, found a good 默认配置 for the axios library that resolved this issue. I am still not certain why this issue only happens on Windows and not on Mac.


下一步工作

如果您正在寻找一个简单快速的程序,用于接收 csv、调用收件人验证 API 并输出 CSV,那么这个程序就是您的最佳选择。


该计划将增加以下内容:

  • 建立前端或更简便的用户界面,以便使用

  • 更好的错误和重试处理,因为如果应用程序接口因某种原因出错,程序目前不会重试调用


我也很想知道使用其他语言(如 Golang 或 Erlang/Elixir)是否能获得更快的结果。


Please feel free to provide me any 反馈或建议 for expanding this project.

Your new standard in Marketing, Pay & Sales. It's Bird

The right message -> to the right person ->right time.

By clicking "See Bird" you agree to Bird's 隐私声明.

Your new standard in Marketing, Pay & Sales. It's Bird

The right message -> to the right person ->right time.

By clicking "See Bird" you agree to Bird's 隐私声明.