Integrating Stripe with BigQuery

One of the projects that I mentioned in my post a couple of weeks ago was the migration of our billing system to Stripe. Stripe is widely used for billing on the internet, in both SaaS and non-SaaS use cases. A while back, I wrote about the general limitations of IPaaS platforms in terms of flexibility and Stripe exposes a lot of these.

One particular product did not expose all of the object type we needed to extract from Stripe. Another simply did not sync all of the object types it claimed to be syncing. A third had a clear bug in which it wrote the current date/time into all date fields. In each of these cases, we were left to file support tickets and wait. I moved on.

Stripe’s API is both feature-rich and complicated. Each object type returns deeply-nested and complicated JSON documents – which is necessary given the level of financial detail that Stripe must support. I have previous written about how I used FME to access arbitrary REST APIs and process the payloads. In the case of Stripe, the number of objects needed and the complexity of each made this approach impractical. There was simply not a “low-code” solution available, so I had to build my own.

The process I chose was pretty simple:

1. Query Stripe objects via their respective APIs
2. Flatten each JSON document into a row
3. Append the row to an array
4. Process the array to a CSV
5. Upload the CSV to Google Cloud Storage (GCS)
6. Import from to BigQuery

It’s worth noting that I chose this approach over processing webhook events from Stripe. Stripe is good about using events for real-time updates, but they incur a lot of processing overhead that wasn’t necessary for our reporting needs. A straightforward daily sync was sufficient for most of the objects in Stripe.

Steps 1 through 5 above were best accomplished with a Node script for each object type. (I could have made a generic script, but there was some variation in how we wanted to process individual objects, so I opted for dedicated scripts.) The code at the end of this post shows a script for processing customer objects. It is the same basic pattern I used for each type of object.

Each script was installed on an instance and set to run via a cron job. It was old-fashioned but reliable. Once a day, new, well-formatted CSVs are pushed to GCS for further processing. A final cron job runs to clean up the local copies of the CSVs after upload. This approach is really flexible. If and when we add new metadata tags to objects, they automatically appear as new columns on the next run.

From here, FME took over. I briefly considered using scheduled jobs in GCP to write the data to BigQuery, but found cases where where I wanted to to some interim transformations. FME was much more straightforward for that. It has a native GCS reader and a native BigQuery writer. It’s wealth of transformers made it an easy choice.

The approach of pre-processing the complexity of the Stripe data and then passing it off to FME for ETL into BigQuery seems to be a happy medium for now. As a result, our data warehouse has billing data that lives side-by-side with the data from our other systems, making it easy to bring into out analytic and reporting workflows.

var fs = require('fs')
var gcloud = require('./helpers/google-cloud-storage.js') //this stores connection params
const path = require('node:path');
const homedir = require('os').homedir();
var lineNum = 0;
var out = [];

const driver = module.exports.driver = async function () {
  //set up connection to Stripe
  const stripe = require('stripe')('YOUR-API-KEY', { apiVersion: '2020-08-27' });
  //list all customers and process via auto-paging
  await stripe.customers.list()
    .autoPagingEach(function (element) {
      processSubItem(element, stripe)
    });
    //process entire out[] array to CSV. Must be done at once to account for variations in row length
    const { Parser } = require('json2csv');
    var parseopts = {header: true}
    const parser = new Parser(parseopts);
    const csv = parser.parse(out);
    let filename = `${homedir}/stripe_customers_${(new Date().toJSON().slice(0,10))}.csv`
    //save CSV locally
    fs.writeFileSync(filename, csv)
    //upload it to GCS
    gcloud.copyFileToGCS(path.resolve(filename), 'your_gcs_bucket')

}

const processSubItem = module.exports.processSub = async function (sub, api) {
    //flatten JSON hierarchy using underscores to build column names
    var flatten = require('flat')
    var flatopts = {delimiter: "_"};
    try {
        const obj = flatten(sub, flatopts);
        out.push(obj)
        lineNum = lineNum + 1;
      } catch (err) {
        console.error(err);
      }
}

driver();