AWS Lambdaを利用しスクレイピングしたい
- なんとか EC2 にインスタンス立てたりすることなく楽に安く済ませたい
- 適度な速度でHTMLをダウンロードしてきたい
HTML 保存した後はまあ適当にやれば良いと思っていて、ゆっくりダウンロードしてくるところが面倒。
今動いてるやつは 200-300個ぐらいしか URL がないのもあって Timeout を5分に設定した lambda:crawl で setTimeout しながら lambda:download_html を invoke してるんだけど増えてきて5分で終わらなくなったら困るなというのがあって考えたやつ
1つめの方法
- 適当な URL が DynamoDB とかに入ってるとして lambda:crawl で Table を Scan して SQS へ
{s3_key: "", url: ""}
のようなメッセージを送っていく - lambda:consume_download_html_queue を Scheduled Event で5分おきに実行し、適度な速度で5分間メッセージを消費/lambda:download_html の invoke を行う
- lambda:download_html で HTML をダウンロードし、S3 に Put する
2つめの方法
SQS について調べるの面倒だなと思っていたら浮かんだやつ。
{tableName: "ダイナモ"} みたいなのを渡されてそっから URL とってきて動きはじめて、いくらか処理してから残ってたら自分に {urls: [処理してないURL]} を渡してinvoke する。
var AWS = require("aws-sdk"); var lambda = new AWS.Lambda(); var dynamoDB = new AWS.DynamoDB(); var s3 = new AWS.S3(); var Promise = require('bluebird'); Promise.promisifyAll(Object.getPrototypeOf(dynamoDB)); Promise.promisifyAll(Object.getPrototypeOf(lambda)); var MAX_FETCH_COUNT = 300; var FETCH_INTERVAL = 1000; var DOWNLOAD_HTML_LAMBDA_FUNCTION_NAME = "download-html"; exports.handler = function crawl(event, context) { console.log("start to crawl"); var fetchUrls = event.urls ? Promise.resolve(event.urls) : fetchUrlsFromTable(event.tableName); fetchUrls .then(function(urls) { var fetchCount = Math.min(MAX_FETCH_COUNT, urls.length); var urlsToFetch = []; for (var i = 0; i < fetchCount; i++) { urlsToFetch.push(urls.pop()); } return Promise .all(urlsToFetch.map(function(item, index) { // downloaded_html/site_a/ 以下にズラっとファイルを並べたいので / を適当に置換しておく var s3_key = "downloaded_html/site_a/" + url.replace(/\//g, "$"); var params = buildLambdaParams(DOWNLOAD_HTML_LAMBDA_FUNCTION_NAME, { url: url, s3_key: s3_key }); return delayedLambdaInvoke(params, FETCH_INTERVAL * index); })) .then(function(_) { console.log("finish to crawl"); if (urls.length > 0) { var params = buildLambdaParams(process.env.AWS_LAMBDA_FUNCTION_NAME, { urls: urls }); return lambda.invokeAsync(params); } }); }) .then(function(_) { context.done(null, "OK"); }) .catch(function(err) { context.done(err); }); }; function fetchUrlsDynamoDB(tableName) { return dynamoDB .scanAsync({ TableName: tableName }) .then(function(data) { var urls = []; data.Items.forEach(function(item) { urls.push(item.url.S); }); return Promise.resolve(urls); }); } function delayedLambdaInvoke(params, delay) { return new Promise(function(resolve, reject) { setTimeout(function() { console.log("Invoke lambda with " + JSON.stringify(params, null, 2)); lambda.invokeAsync(params, function(err, data) { if (err) { reject(err); } else { resolve(data); } }); }, delay); }); } function lambdaInvoke(params) { return new Promise(function(resolve, reject) { console.log("Invoke lambda with " + JSON.stringify(params, null, 2)); lambda.invokeAsync(params, function(err, data) { if (err) { reject(err); } else { resolve(data); } }); }); } function buildLambdaParams(funcName, payloadObject) { return { FunctionName: funcName, InvocationType: 'Event', LogType: 'None', Payload: JSON.stringify(payloadObject, null, 2), }; }