Real-time Publishing to Socrata Processing and publishing Air Quality data for the Sheffield datastore

This is the first of two posts that discuss the publishing of real time Air Quality data in Sheffield. In this post we’ll look at the technical specifics of where the data comes from, how it is processed and where it ends up. In the follow-up posting, we’ll consider some of the softer issues around this process.

We already have a process for scraping the real time measurements from the Sheffield Air Quality site. This process maintains a cursor of the highest timestamp seen for each station/sensor and requests any data since that timestamp. That actual list of stations and sensors is scraped using a separate script which also scrapes the HTML from that site. Both of these scripts do a substantial amount of data verification, validation and cleanup (normalisation of dates, creation of identifiers, etc). The source code for the scripts is available on GitHub.  These scripts incrementally populate our Virtuoso SPARQL endpoint for dynamic querying and joining with other related datasets.

It seems natural to extend this scrape/publish script with a second target for the data. So initially, we take each measurement and broadcast it to all the required endpoints. The Sheffield SPARQL endpoint is now in the Socrata system here: https://data.sheffield.gov.uk/Environment/Live-Air-Quality-Data-Stream/mnz9-msrb.

Pushing data into the Socrata platform in a programmatic way is reasonably straightforward once you’ve worked out the basic assumptions. The following code snippet is a groovy fragment for pushing a list of values in.

def pushToSocrata(data_rows, token, un, pw) {

println("Pushing to socrata [${token},${un},${pw}]");

if ( data_rows == null || data_rows.size() == 0 )
return;

try {

def colheads = "ssn_measurement_id,ssn_sensor_id,ssn_measurement_time,ssn_measurement_value"
def http = new HTTPBuilder( 'https://data.sheffield.gov.uk' )

// Always handle text/csv as text/plain
http.encoder.putAt('text/csv', new MethodClosure(http.encoder, 'encodeText'))

def sw = new StringWriter()
sw.write(colheads)
data_rows.each{ row ->
sw.write('\r\n"'+row[0]+'"');
sw.write(',"'+row[1]+'"');
sw.write(',"'+row[2]+'"');
sw.write(','+row[3]+'');
}
sw.write('\r\n');

def content = sw.toString()
println("\n\nUpload string:");
println(content);
println("\n\n");

def auth_str = "${un}:${pw}".bytes.encodeBase64().toString()

println("Auth header: ${auth_str}");

http.request( POST ) { req ->
uri.path = '/resource/mnz9-msrb.json'
headers.'Authorization' = "Basic ${auth_str}"
headers.'X-App-Token' = token
requestContentType = 'text/csv'
body = content

response.success = { resp, json ->
println "POST response status: ${resp.statusLine}"
println json
}

response.failure = { resp ->
println("\n\n****Failure: ${resp.statusLine} ${resp.status} ${resp}\n\n");
}
}
}
catch ( Exception e ) {
e.printStackTrace();
}

println("Push to socrata completed");
}

In essence, the tricky parts are making sure you have the right resource URI for Socrata, registering a content encoder for the text/csv endpoint. There don’t seem to be any examples of using HTTPBuilder to post text/csv out there in the real world.

We’ve reused the row level identifiers that the existing publishing tool creates, and adopted a system whereby repeating properties for sensors and stations are referenced from a separate file. Because the socrata platform does not allow joins between data, users will need to do a degree of cross-referencing of files. Alternatively, just use the SPARQL endpoint at http://apps.opensheffield.org/sparql. There are examples of queries here.

Currently, there is over 15 years historic data in the AQ+ SPARQL endpoint. In discussion with our colleagues at Socrata, we’ve decided to take today as “Day Zero” for data publication, so data.sheffield.gov.uk will contain data from now going forward.

Ian Ibbotson

Open source developer and contributor working in libraries, local government, culture and learning.