いつも分からなくなるNode.jsのStream APIの使い方を理解する
この投稿は個人のメモ的な要素が強いです。
AWSを使っていると、S3にJSONファイルを配置する処理をたびたび行います。
その際にJSONからJSON Lines形式に変換すると、S3からデータを処理するときに都合が良いので、そのような処理もついでに行います。
Node.jsにおいては、JSONからJSON Linesに変換する際にStream APIを使うと、いい感じに処理できます。
全体のプログラムは以下にあります。
本投稿では、JSONからJSON Linesへのデータ変換を通して、Stream APIの使い方について紹介します。
内容
前提
まずは、以下のようにaxiosを使って、JSONのデータを取得することにします。
const response = await axios.get( 'https://jsonplaceholder.typicode.com/posts' );
読み取りStream
取得したJSONデータをStreamのデータに変換します。
const readable = Readable.from(response.data);
Readable.fromの引数には、配列などの繰り返し可能なオブジェクトを指定します。
ここでは、配列からStreamにすることで、配列の要素が順次処理されます。
書き込みStream
書き込み先はS3になるため、そのためのStreamを用意します。
S3では、uploadメソッドがStreamに対応しています。
const { writeStream, promise } = (function () { const pass = new PassThrough(); return { writeStream: pass, promise: s3 .upload({ Bucket: 'hi1280-example', Key: 'data.json', Body: pass, }) .promise(), }; })();
PassThroughオブジェクトを使用すると、読み取りStreamをそのままで、書き込みStreamとして扱うことができます。
s3のBodyにPassThroughで作成したStreamを指定することで、一連のStream処理で作成されたデータがS3にアップロードされます。
変換Stream
const transform = new Transform({ transform(chunk, _, done) { this.push(JSON.stringify(chunk) + '\n'); done(); }, objectMode: true, });
chunkには、Readable.fromで渡した配列の要素が順次格納されます。
chunkをJSON文字列化して、JSON Linesの形式にします。
読み取りStreamがオブジェクトになっているので、objectModeはtrueにしないとエラーになります。
Streamを繋げる
各Streamをpipeで繋げます。
readable.pipe(transform).pipe(writeStream);
最後にS3へのアップロード処理が完了するまで、待つ必要があります。
await promise;
まとめ
Stream APIを使うためには、読み取り、書き込み、変換の各種APIを理解する必要があります。
Stream APIは順次データを処理する仕組みのため、適切に使用すればメモリ使用率を抑えられます。
しかし、本内容においては、最初にJSONのデータを一括でダウンロードしているため、その恩恵を受けられていません。
もっと良い方法があれば、改善していきたいと思います。