なになれ

IT系のことを記録していきます

いつも分からなくなるNode.jsのStream APIの使い方を理解する

この投稿は個人のメモ的な要素が強いです。
AWSを使っていると、S3にJSONファイルを配置する処理をたびたび行います。
その際にJSONからJSON Lines形式に変換すると、S3からデータを処理するときに都合が良いので、そのような処理もついでに行います。
Node.jsにおいては、JSONからJSON Linesに変換する際にStream APIを使うと、いい感じに処理できます。

全体のプログラムは以下にあります。

github.com

本投稿では、JSONからJSON Linesへのデータ変換を通して、Stream APIの使い方について紹介します。

内容

前提

まずは、以下のようにaxiosを使って、JSONのデータを取得することにします。

const response = await axios.get(
  'https://jsonplaceholder.typicode.com/posts'
);

読み取りStream

取得したJSONデータをStreamのデータに変換します。

const readable = Readable.from(response.data);

Readable.fromの引数には、配列などの繰り返し可能なオブジェクトを指定します。
ここでは、配列からStreamにすることで、配列の要素が順次処理されます。

書き込みStream

書き込み先はS3になるため、そのためのStreamを用意します。
S3では、uploadメソッドがStreamに対応しています。

const { writeStream, promise } = (function () {
  const pass = new PassThrough();
  return {
    writeStream: pass,
    promise: s3
      .upload({
        Bucket: 'hi1280-example',
        Key: 'data.json',
        Body: pass,
      })
      .promise(),
  };
})();

PassThroughオブジェクトを使用すると、読み取りStreamをそのままで、書き込みStreamとして扱うことができます。
s3のBodyにPassThroughで作成したStreamを指定することで、一連のStream処理で作成されたデータがS3にアップロードされます。

変換Stream

JSONJSON Linesに変換する部分です。

const transform = new Transform({
  transform(chunk, _, done) {
    this.push(JSON.stringify(chunk) + '\n');
    done();
  },
  objectMode: true,
});

chunkには、Readable.fromで渡した配列の要素が順次格納されます。
chunkをJSON文字列化して、JSON Linesの形式にします。
読み取りStreamがオブジェクトになっているので、objectModeはtrueにしないとエラーになります。

Streamを繋げる

各Streamをpipeで繋げます。

readable.pipe(transform).pipe(writeStream);

最後にS3へのアップロード処理が完了するまで、待つ必要があります。

await promise;

まとめ

Stream APIを使うためには、読み取り、書き込み、変換の各種APIを理解する必要があります。
Stream APIは順次データを処理する仕組みのため、適切に使用すればメモリ使用率を抑えられます。
しかし、本内容においては、最初にJSONのデータを一括でダウンロードしているため、その恩恵を受けられていません。
もっと良い方法があれば、改善していきたいと思います。

参考