Subscribed unsubscribe Subscribe Subscribe

AWS Lambdaを利用しスクレイピングしたい

  • なんとか EC2 にインスタンス立てたりすることなく楽に安く済ませたい
  • 適度な速度でHTMLをダウンロードしてきたい

HTML 保存した後はまあ適当にやれば良いと思っていて、ゆっくりダウンロードしてくるところが面倒。

今動いてるやつは 200-300個ぐらいしか URL がないのもあって Timeout を5分に設定した lambda:crawl で setTimeout しながら lambda:download_html を invoke してるんだけど増えてきて5分で終わらなくなったら困るなというのがあって考えたやつ

1つめの方法

  1. 適当な URL が DynamoDB とかに入ってるとして lambda:crawl で Table を Scan して SQS へ {s3_key: "", url: ""} のようなメッセージを送っていく
  2. lambda:consume_download_html_queue を Scheduled Event で5分おきに実行し、適度な速度で5分間メッセージを消費/lambda:download_html の invoke を行う
  3. lambda:download_html で HTML をダウンロードし、S3 に Put する

2つめの方法

SQS について調べるの面倒だなと思っていたら浮かんだやつ。

{tableName: "ダイナモ"} みたいなのを渡されてそっから URL とってきて動きはじめて、いくらか処理してから残ってたら自分に {urls: [処理してないURL]} を渡してinvoke する。

var AWS = require("aws-sdk");
var lambda = new AWS.Lambda();
var dynamoDB = new AWS.DynamoDB();
var s3 = new AWS.S3();

var Promise = require('bluebird');
Promise.promisifyAll(Object.getPrototypeOf(dynamoDB));
Promise.promisifyAll(Object.getPrototypeOf(lambda));

var MAX_FETCH_COUNT = 300;
var FETCH_INTERVAL = 1000;
var DOWNLOAD_HTML_LAMBDA_FUNCTION_NAME = "download-html";

exports.handler = function crawl(event, context) {
  console.log("start to crawl");

  var fetchUrls = event.urls ? Promise.resolve(event.urls) : fetchUrlsFromTable(event.tableName);
  fetchUrls
    .then(function(urls) {
      var fetchCount = Math.min(MAX_FETCH_COUNT, urls.length);
      var urlsToFetch = [];
      for (var i = 0; i < fetchCount; i++) {
        urlsToFetch.push(urls.pop());
      }

      return Promise
        .all(urlsToFetch.map(function(item, index) {
          // downloaded_html/site_a/ 以下にズラっとファイルを並べたいので / を適当に置換しておく
          var s3_key = "downloaded_html/site_a/" + url.replace(/\//g, "$");
          var params = buildLambdaParams(DOWNLOAD_HTML_LAMBDA_FUNCTION_NAME, {
            url: url,
            s3_key: s3_key
          });
          return delayedLambdaInvoke(params, FETCH_INTERVAL * index);
        }))
        .then(function(_) {
          console.log("finish to crawl");

          if (urls.length > 0) {
            var params = buildLambdaParams(process.env.AWS_LAMBDA_FUNCTION_NAME, {
              urls: urls
            });
            return lambda.invokeAsync(params);
          }
        });
    })
    .then(function(_) {
      context.done(null, "OK");
    })
    .catch(function(err) {
      context.done(err);
    });
};

function fetchUrlsDynamoDB(tableName) {
  return dynamoDB
    .scanAsync({
      TableName: tableName
    })
    .then(function(data) {
      var urls = [];
      data.Items.forEach(function(item) {
        urls.push(item.url.S);
      });

      return Promise.resolve(urls);
    });
}

function delayedLambdaInvoke(params, delay) {
  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      console.log("Invoke lambda with " + JSON.stringify(params, null, 2));

      lambda.invokeAsync(params, function(err, data) {
        if (err) {
          reject(err);
        } else {
          resolve(data);
        }
      });
    }, delay);
  });
}

function lambdaInvoke(params) {
  return new Promise(function(resolve, reject) {
    console.log("Invoke lambda with " + JSON.stringify(params, null, 2));

    lambda.invokeAsync(params, function(err, data) {
      if (err) {
        reject(err);
      } else {
        resolve(data);
      }
    });
  });
}

function buildLambdaParams(funcName, payloadObject) {
  return {
    FunctionName: funcName,
    InvocationType: 'Event',
    LogType: 'None',
    Payload: JSON.stringify(payloadObject, null, 2),
  };
}