CSV Streaming to Azure Blob Storage

Trucking and logistics industry leader wanted a more flexible solution for managing historical data​

Challenge

Our client tracks GPS data for fleets of trucks using the Samsara platform in order to perform big-data analysis using R-Studio. As they onboard new trucking companies, they needed a solution that not only allowed them to track the latitude and longitude of truck data in real time, but also allowed them to track historical data. In order to collect and process the data while keeping their existing system in place, they wanted CSV files to be snapshotted twice monthly onto a file server, with the data being updated every 30 seconds in order to provide the detailed level of analysis they required.

Strategy

In order to meet the requirements, Cloudwell decided to use a Node.js Azure function app to process the data based on a schedule, Blob storage to store the running CSVs, and an on-premises data gateway as a mechanism to transfer the files to a file share for R-Studio analysis. By using Azure functions, we’re able to control the schedule for live streaming and historical snapshots by using CRON scheduling and can ensure that we can run light-weight, serverless functions backed by Azure’s robust infrastructure. With Blob storage we have redundancy in the event that the on-premise server goes down while also using Blob streaming to keep the running CSV up-to-date in real-time.

To deliver the solution for our client, we wrote a function app that utilizes Samsara’s REST API to query historical vehicle data. While the data from Samsara comes back as an array of records with multiple vehicle IDs, each with its own latitude and longitude, we needed to send this array through the array.flatMap function in order to create a flat structure of one row per GPS record with the appropriate ID and name assigned. We then looped the flat data set and pushed each record onto a Node.js Readable Stream. Finally, we passed the stream as the data argument of the Azure BlockBlobClient.Upload() method. If we hadn’t used a stream, the size of the dataset would have prohibited us from uploading the CSV file and Node.js would throw an Out of Memory Exception for large datasets. For live steams, we used the AppendBlobClient class to keep our running CSV up-to-date with more information.

				
					export const getDataSnapshotAsCsv = (context: Context, data: Datum[]): Readable => {
    const fields = ['id', 'name', 'gps.time', 'gps.latitude', 'gps.longitude', 'gps.headingDegrees', 'gps.speedMilesPerHour', 'gps.reverseGeo.formattedLocation', 'gps.address.id', 'gps.address.name', 'gps.isEcuSpeed'];
        
    let fileHeader: string = '';
    fileHeader = fields.join(',');
    const readable = new Stream.Readable();
    readable.push(`${fileHeader}\r\n`);
    

    const dataFlat = data.flatMap(d => d.gps.map(g => ({...g, id: d.id, name: d.name})));

    dataFlat.forEach(record => {
        readable.push(`"'${record.id}","${record.name}","${record.time}","${record.latitude}","${record.longitude}","${record.headingDegrees}","${record.speedMilesPerHour}","${(record.reverseGeo ? record.reverseGeo.formattedLocation : undefined) || ''}","${(record.address ? record.address.id : undefined) || ''}","${(record.address ? record.address.name : undefined) || ''}","${record.isEcuSpeed}"\r\n`);
    })

    readable.push(null);
    return readable;
}

export const uploadCsv = async (context: Context, csvAsStream: Readable, companyName: string, fileName: string, options?: BlockBlobUploadOptions): Promise => {
    const AZURE_STORAGE_CONNECTION_STRING = process.env.AZURE_STORAGE_CONNECTION_STRING;
    const blobServiceClient = BlobServiceClient.fromConnectionString(AZURE_STORAGE_CONNECTION_STRING);
    const containerName = companyName.toLowerCase().replace(/ /g, '-');
    const containerClient = blobServiceClient.getContainerClient(containerName);
    
    await containerClient.createIfNotExists();

    const blobName = fileName;
    const blockBlobClient = containerClient.getBlockBlobClient(blobName);

    let opts: BlockBlobUploadStreamOptions;
    if(options)
    {
        opts = options;
    }
    const bufferSize = 4 * 1024 * 1024;
    const maxBuffers = 20;
    const uploadBlobResponse = await blockBlobClient.uploadStream(csvAsStream, bufferSize, maxBuffers, opts);
}
				
			

Results

Once the data is in Blob storage, we are able to snapshot the CSVs twice monthly and store them on the client’s file share using an on-premises data gateway. As a result of our solution, our client is now able to reliably access the CSV files to process with R-Studio without ever having to leave their existing server.

CSVs in Azure Blob Storage