This subsections presents implementation detail of NodeJS read/write streams and Multer custom engine working with AWS S3 SDK.
Initialize S3Client object with credentials, region and other HTTP configurations.
import { S3Client } from '@aws-sdk/client-s3';
import dotenv from 'dotenv';
dotenv.config();
import { NodeHttpHandler } from "@smithy/node-http-handler";
import https from "https";
let jsonSecret = {
AWS_ACCESS_KEY_ID: process.env.AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY: process.env.AWS_SECRET_ACCESS_KEY,
BUCKET_NAME: process.env.BUCKET_NAME,
};
let s3 = new S3Client({
credentials: {
accessKeyId: jsonSecret.AWS_ACCESS_KEY_ID,
secretAccessKey: jsonSecret.AWS_SECRET_ACCESS_KEY,
},
region: "us-east-1",
// Use a custom request handler so that we can adjust the HTTPS Agent and
// socket behavior.
requestHandler: new NodeHttpHandler({
httpsAgent: new https.Agent({
maxSockets: 500,
// keepAlive is a default from AWS SDK. We want to preserve this for
// performance reasons.
keepAlive: true,
keepAliveMsecs: 1000,
}),
socketTimeout: 900000,
}),
});
export { s3, jsonSecret };
Next initalize singleton objects for file system path and file system action based on environment variable IS_AWS_S3
, create neccessary directories.
import dotenv from 'dotenv';
dotenv.config();
import { AbstractFileSystemAction, LocalFileSystemAction } from 'service/fileSystem/localFileSystemAction';
import { AbstractFileSystemPath, LocalFileSystemPath } from 'service/fileSystem/localFileSystemPath';
import { AWSS3FileSystemAction } from 'service/fileSystem/awsS3FileSystemAction';
import { AWSS3FileSystemPath } from 'service/fileSystem/awsS3FileSystemPath';
let fileSystemActionObject: AbstractFileSystemAction;
let fileSystemPathObject: AbstractFileSystemPath;
const isAWSS3 = process.env.IS_AWS_S3 === '1';
if (isAWSS3) {
fileSystemActionObject = new AWSS3FileSystemAction();
fileSystemPathObject = new AWSS3FileSystemPath();
} else {
fileSystemActionObject = new LocalFileSystemAction();
fileSystemPathObject = new LocalFileSystemPath();
}
fileSystemActionObject.createDir(fileSystemPathObject.streamDirectoryAbsolutePath(), { recursive: true });
fileSystemActionObject.createDir(fileSystemPathObject.uploadVideoDirectoryAbsolutePath(), { recursive: true });
fileSystemActionObject.createDir(fileSystemPathObject.uploadChunkDirectoryAbsolutePath(), { recursive: true });
export { fileSystemActionObject, fileSystemPathObject };
Implement a write stream to a AWS S3 Bucket file path with 3 steps of operation:
_constructor
method using CreateMultipartUploadCommand
class.UploadPartCommand
class._final
method (called after end
event is emitted to the write stream) calling CompleteMultipartUploadCommand
class.DeleteObjectCommand
class is calledimport { s3 } from 'service/fileSystem/awsS3Config';
import {
_Object,
CreateMultipartUploadCommand,
UploadPartCommand,
CompleteMultipartUploadCommand,
DeleteObjectCommand
} from "@aws-sdk/client-s3";
import { jsonSecret } from "service/fileSystem/awsS3Config";
import { Writable } from "node:stream";
export class AWSS3FileWriteStream extends Writable {
highWaterMark: number;
filePath: string;
chunks: Uint8Array = new Uint8Array();
writesCount: number = 0;
uploadResultsPromise: Promise<any>[] = [];
uploadId: string; // AWS S3 Object Multipart Id, retain it in the whole process
constructor({ highWaterMark, filePath }) {
super({ highWaterMark });
this.highWaterMark = highWaterMark;
this.filePath = filePath;
}
/**
*
* @param callback This optional function will be called in a tick after the stream constructor has returned,
* delaying any _write(), _final() and _destroy() calls until callback is called.
* This is useful to initialize state or asynchronously initialize resources before the stream can be used.
*/
_construct(callback: (error?: Error | null) => void): void {
console.log('AWS S3 write stream _construct called');
s3.send(
new CreateMultipartUploadCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: this.filePath,
}),
).then((multipartUpload) => {
console.log('AWS S3 write stream _construct called');
console.log('CreateMultipartUploadCommand success uploadId:', multipartUpload.UploadId);
this.uploadId = multipartUpload.UploadId;
callback();
}).catch((err) => {
callback(err);
});
}
chunksConcat(newChunk: Uint8Array): void {
var mergedArray = new Uint8Array(this.chunks.byteLength + newChunk.byteLength);
mergedArray.set(this.chunks);
mergedArray.set(newChunk, this.chunks.byteLength);
this.chunks = mergedArray;
}
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
console.log('AWS S3 write stream _write called chunk:', chunk);
// chunk = Uint8Array.from(chunk);
this.chunksConcat(chunk);
if (this.chunks.byteLength >= this.highWaterMark) {
this.writesCount += 1;
let partNumber = this.writesCount;
let uploadingChunk = this.chunks;
this.chunks = new Uint8Array(0);
// console.log('uploadingChunk', uploadingChunk);
// console.log('uploadingChunk.byteLength:', uploadingChunk.byteLength);
// console.log(`AWSWriteStream uploading part ${partNumber}, ${uploadingChunk.byteLength} bytes`);
this.uploadResultsPromise.push(
s3.send(new UploadPartCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: this.filePath,
UploadId: this.uploadId,
Body: uploadingChunk,
PartNumber: partNumber
})).then((uploadResult) => {
console.log(`File ${this.filePath} write part number`, partNumber, "uploaded");
console.log('Write stream uploadResult', uploadResult);
return uploadResult;
}).catch((err) => {
console.log('WriteStream UploadPartCommand error', err);
callback(err);
})
);
callback();
} else {
// when we are done, we should call the callback function
callback();
}
}
// this will run after the our stream has finished
_final(callback) {
if (this.chunks.byteLength > 0) {
this.writesCount += 1;
let partNumber = this.writesCount;
let uploadingChunk = this.chunks;
this.chunks = new Uint8Array(0);
// console.log(`AWSWriteStream uploading last part ${partNumber}, ${uploadingChunk.byteLength} bytes`);
this.uploadResultsPromise.push(
s3.send(new UploadPartCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: this.filePath,
UploadId: this.uploadId,
Body: uploadingChunk,
PartNumber: this.writesCount
})).then((uploadResult) => {
console.log(`File ${this.filePath} write part number`, partNumber, "uploaded");
return uploadResult;
}).catch((err) => {
console.log('WriteStream last part UploadPartCommand error', err);
callback(err);
})
);
}
console.log('AWS S3 write stream _final called');
Promise.all(this.uploadResultsPromise)
.then((uploadResults) => {
console.log('CompleteMultipartUploadCommand config', {
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: this.filePath,
UploadId: this.uploadId,
MultipartUpload: {
Parts: uploadResults.map(({ ETag }, i) => ({
ETag,
PartNumber: i + 1,
})),
},
});
s3.send(
new CompleteMultipartUploadCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: this.filePath,
UploadId: this.uploadId,
MultipartUpload: {
Parts: uploadResults.map(({ ETag }, i) => ({
ETag,
PartNumber: i + 1,
})),
},
}),
)
.then((_) => callback())
.catch((err) => { console.log('CompleteMultipartUploadCommand error', err); callback(err) });
})
.catch((err) => {
console.log('Promise.all(this.uploadResultsPromise)', err);
callback(err);
});
}
_destroy(error, callback) {
console.log("Write Count:", this.writesCount);
if (error) {
const deleteCommand = new DeleteObjectCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: this.filePath,
})
s3.send(deleteCommand).then((_) => callback(error)).catch((_) => callback(error));
} else {
callback();
}
}
}
Implement a read stream from a AWS S3 Bucket file path, each time calling getObjectRange
with desired byte range.
import { AbstractFileSystemAction } from "./localFileSystemAction";
import { s3 } from 'service/fileSystem/awsS3Config';
import {
ListObjectsV2Command,
GetObjectCommand,
DeleteObjectCommand,
DeleteObjectsCommand,
_Object,
CreateMultipartUploadCommand,
UploadPartCommand,
CompleteMultipartUploadCommand
} from "@aws-sdk/client-s3";
import { jsonSecret } from "service/fileSystem/awsS3Config";
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { Writable, Readable } from "node:stream";
export const getObjectRange = ({ bucket, key, start, end }) => {
const command = new GetObjectCommand({
Bucket: bucket,
Key: key,
Range: `bytes=${start}-${end}`,
});
return s3.send(command);
};
/**
* @param {string | undefined} contentRange
*/
export const getRangeAndLength = (contentRange: string) => {
contentRange = contentRange.slice(6);
console.log('getRangeAndLength input', contentRange);
const [range, length] = contentRange.split("/");
const [start, end] = range.split("-");
return {
start: parseInt(start),
end: parseInt(end),
length: parseInt(length),
};
};
export const isComplete = ({ end, length }) => end === length - 1;
export class AWSS3FileReadStream extends Readable {
filePath: string;
highWaterMark: number;
lastRange = { start: -1, end: -1, length: -1 };
nextRange = { start: -1, end: -1, length: -1 };
constructor({ highWaterMark, filePath }) {
super({ highWaterMark });
this.filePath = filePath;
}
_construct(callback: (error?: Error | null) => void): void {
console.log('AWS read stream _construct called');
callback();
}
chunksConcat(newChunk: Uint8Array): void {
var mergedArray = new Uint8Array(this.chunks.byteLength + newChunk.byteLength);
mergedArray.set(this.chunks);
mergedArray.set(newChunk, this.chunks.byteLength);
this.chunks = mergedArray;
}
_read(size) {
console.log('AWS read stream _read called');
if (isComplete(this.lastRange)) {
this.push(null);
return;
}
const { end } = this.lastRange;
this.nextRange = { start: end + 1, end: end + size, length: size };
getObjectRange({
bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
key: this.filePath,
...this.nextRange,
}).then(({ ContentRange, Body }) => {
const contentRange = getRangeAndLength(ContentRange);
console.log('_read contentRange', contentRange);
this.lastRange = contentRange;
Body.transformToByteArray()
.then((chunk) => {
this.chunksConcat(chunk);
let uploadingChunk = this.chunks;
this.chunks = new Uint8Array(0);
console.log('read stream push chunk', uploadingChunk);
this.push(uploadingChunk);
})
.catch((error) => {
console.log('Body.transformToByteArray() error', error);
});
});
}
_destroy(error, callback) {
callback();
}
}
Extending multer.storageEngine
and implementing 2 method _handleFile
and _removeFile
:
_handleFile
: initialize multipart upload with CreateMultipartUploadCommand
command, getting read stream from the uploaded file (file.stream
) and read all file data into a buffer, then splice buffer into multiple of 5MiB chunks and upload to AWS one-by-one, calling the UploadPartCommand
command; after finishing, call the CompleteMultipartUploadCommand
to finalize uploading process._removeFile
: call DeleteObjectCommand
to remove file in case of upload failure.import { Request } from "express";
import multer from "multer";
import path from "path";
import {
CreateMultipartUploadCommand,
UploadPartCommand,
CompleteMultipartUploadCommand,
AbortMultipartUploadCommand,
S3Client,
DeleteObjectCommand,
} from "@aws-sdk/client-s3";
import { s3, jsonSecret } from "service/fileSystem/awsS3Config";
import { fileSystemPathObject } from "initFs";
type nameFnType = (req: Request, file: Express.Multer.File) => string;
type Options = {
nameFn?: nameFnType
}
const defaultNameFn: nameFnType = (
_req: Request,
file: Express.Multer.File
) => {
return file.originalname;
};
interface CustomFileResult extends Partial<Express.Multer.File> {
name: string;
}
export const multipartUploadPartSize = 5 * 1024 * 1024; // AWS UploadPartCommand set lower-bound size = 5MB (if the total file size > 5MB)
export class AWSS3CustomStorageEngine implements multer.StorageEngine {
private nameFn: nameFnType;
constructor(opts: Options) {
this.nameFn = opts.nameFn || defaultNameFn;
}
async _handleFile(req: Request, file: Express.Multer.File, callback: (error?: any, info?: Partial<Express.Multer.File>) => void): Promise<void> {
if (!s3) {
callback(new Error("S3 Client not exist!!!"));
}
const fileName = this.nameFn(req, file);
const chunkReadStream = file.stream;
const chunkS3Path = fileSystemPathObject.uploadChunkFilePath(fileName);
let uploadId: string;
let tmpBuffer: Buffer = Buffer.alloc(0);
try {
const multipartUpload = await s3.send(
new CreateMultipartUploadCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: chunkS3Path,
}),
);
uploadId = multipartUpload.UploadId;
const uploadResults = [];
let partNumberCnt = 0;
chunkReadStream.on('data', async (chunk: Buffer) => {
tmpBuffer = Buffer.concat([tmpBuffer, chunk]);
});
let fileSize;
chunkReadStream.on('end', async () => {
fileSize = tmpBuffer.byteLength;
try {
console.log("******************************************************************************\n\nUpload chunk to server finish, start writing chunk to S3")
while (tmpBuffer.byteLength > 0) {
const cutBuffer = tmpBuffer.subarray(0, multipartUploadPartSize);
tmpBuffer = tmpBuffer.subarray(multipartUploadPartSize);
let partNumber = partNumberCnt + 1;
const uploadResult = await s3.send(new UploadPartCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: chunkS3Path,
UploadId: uploadId,
Body: cutBuffer,
PartNumber: partNumber
}))
console.log("Part", partNumber, " of the chunk, size: ", cutBuffer.length / (1024 * 1024), " MiB uploaded\n\n******************************************************************************");
uploadResults.push(uploadResult);
partNumberCnt += 1;
}
} catch (error) {
console.error('UploadPartCommand error:', error);
}
try {
const res = await s3.send(
new CompleteMultipartUploadCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: chunkS3Path,
UploadId: uploadId,
MultipartUpload: {
Parts: uploadResults.map(({ ETag }, i) => ({
ETag,
PartNumber: i + 1,
})),
},
}),
);
callback(null, {
fieldname: 'video',
originalname: fileName,
size: fileSize
});
} catch (error) {
console.error('CompleteMultipartUploadCommand error:', error);
}
});
chunkReadStream.on('error', (error) => {
console.error('Error reading the file:', error);
});
} catch (err) {
if (uploadId) {
const abortCommand = new AbortMultipartUploadCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: chunkS3Path,
UploadId: uploadId,
});
await s3.send(abortCommand);
}
}
}
async _removeFile(req: Request, file: Express.Multer.File, callback: (error: Error | null) => void): Promise<void> {
const fileName = this.nameFn(req, file);
const chunkS3Path = fileSystemPathObject.uploadChunkFilePath(fileName);
const deleteCommand = new DeleteObjectCommand({
Bucket: jsonSecret.BUCKET_NAME ? jsonSecret.BUCKET_NAME : "",
Key: chunkS3Path,
})
await s3.send(deleteCommand);
}
}
/**
* This is run right after Multer process upcoming chunk and store into tmp directory
* @param req
* @param res
*/
export const uploadChunk = async (req, res) => {
try {
if (req.file) {
const dbModule = await import('service/database/lowdb');
const db: Low<{}> = await dbModule.default;
/**
* Multer check and saved chunk successfully
*/
const baseFileName = req.file.originalname.replace(/\s+/g, '');
/**
* Filename: ex-machima.part_1
*
* -> filename: ex-machima
* -> part: 1
*/
const fileName: string = FilenameUtils.getBaseName(baseFileName);
const partNo: number = FilenameUtils.getPartNumber(baseFileName);
if (!db.data[fileName]) {
throw `Server not recognize chunk\'s video filename ${fileName}`;
} else if (typeof partNo !== 'number') {
throw 'File chunk must contain part number, must be <movie-name>.part_<no>!';
} else if (partNo >= db.data[fileName].length) {
throw `PartNo out of range for filename ${fileName}`;
} else {
/**
* Check is last chunk
*/
await db.update((data) => data[fileName][partNo] = true);
const uploadChunksState = db.data[fileName];
const isFinish = (uploadChunksState.length > 0) && (uploadChunksState.filter((chunkState: boolean) => chunkState == false).length == 0);
if (isFinish) {
/**
* True code
*/
await UploadUtils.mergeChunks(db, fileName, undefined);
VideoProcessUtils.generateMasterPlaylist(fileName);
/**
* Mock test response failed on last part to test cancelling upload
*/
// throw 'Upload part error';
}
res.send({
partNo: partNo,
fileName: fileName,
success: true,
})
}
} else {
throw 'Multer did not accept chunk!';
}
} catch (error) {
console.error('uploadChunk error:', error);
res.status(400).send({ error, success: false });
}
}
import { Low } from 'lowdb/lib';
import path from 'path';
import { fileSystemActionObject, fileSystemPathObject } from 'initFs';
export class UploadUtils {
public static async mergeChunks(db: Low<{}>, fileName: string, chunkSum: number | undefined): Promise<void> {
const MAX_RETRIES = 5;
const RETRY_DELAY = 1000; // 1 second
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
/**
* Open file stream, adding chunks into file by command:
* chunkStream.pipe(writeStream, {end: false})
*/
const finalFilePath = fileSystemPathObject.uploadVideoFilePath(fileName);
const writeStream = fileSystemActionObject.createWriteStream(finalFilePath, { highWaterMark: 5 * 1024 * 1024 }); // 5MiB buffer
const uploadChunksState = db.data[fileName];
const totalPart = chunkSum ?? uploadChunksState.length;
for (let i = 0; i < totalPart; i++) {
const chunkName = `${fileName}.part_${i}`;
console.log('chunkName', chunkName);
let retries = 0;
while (retries < MAX_RETRIES) {
try {
const chunkPath = fileSystemPathObject.uploadChunkFilePath(chunkName);
const readStream = fileSystemActionObject.createReadStream(chunkPath, { highWaterMark: 512 * 1024 }); // 512 KiB each read
await new Promise<void>((resolve, reject) => {
fileSystemActionObject.addEventListenerReadStream(readStream, 'end', () => {
console.log('Readstream part ', i, ' end');
fileSystemActionObject.rmFile(chunkPath);
resolve();
});
fileSystemActionObject.addEventListenerReadStream(readStream, 'error', (err) => {
console.error(`Error reading chunk ${chunkName}:`, err);
reject(err);
});
try {
fileSystemActionObject.pipeReadToWrite(readStream, writeStream, { end: false });
} catch (error) {
console.error('Piping read stream to write stream error', error);
}
});
break;
} catch (error) {
console.error(`Failed at ${retries} effort for ${chunkName}. Retrying...`);
retries += 1;
if (retries < MAX_RETRIES) {
await delay(RETRY_DELAY);
} else {
console.error(`Failed to process chunk ${chunkName} after ${retries} retries.`);
}
}
}
}
await new Promise<void>((resolve, reject) => {
fileSystemActionObject.addEventListenerWriteStream(writeStream, 'finish', () => {
console.log('_final has finish with callback() called');
resolve();
});
writeStream.end(); // Write stream end to trigger _final handler
});
}
...
}