CJAC is a community oversight task force founded to research ways to improve the overcrowded, dangerous conditions at OKC’s local jail, the Oklahoma County Detention Center (OCDC).
They advise the Jail Trust, which was created to take control of OCDC away from the Sheriff when conditions deteriorated to the point of crisis in 2019.
1.2 The project: What is CJAC?
1.3 The project: My role with CJAC
My role: CJAC’s go-to data person
I developed and maintain a Shiny dashboard for jail data.
I also provide custom reports, datasets, answer research questions, etc. for CJAC’s Director / members.
Including: Oklahoma County’s DA, members of our Public Defender’s office, court officials, etc.
I’ve also been able to provide this data to others beyond CJAC, like the OKC Diversion Hub and Homeless Alliance.
1.4 The project: CJAC Version 2.0
One of my first tasks when I started in 2020 was building our current CJAC data pipeline / dashboard.
We store everything in our own database, so we can access it easily with our R package.
My long, ongoing, nearly-complete project to overhaul and improve this pipeline is what I’m going to show you today!
First, I’ll show you a bit more of what things looked like originally.
1.5 The project: The CJAC Pipeline
# This is what our R package for accessing the data looks like in actionlibrary(ojodb)ocdc_data <-ojo_tbl(schema ="ocdc", table ="arrest") |>select(-c(starts_with("def_"), fbi_num, state_id)) |>filter(book_date >="2016-01-01") |># Very SQL-like syntaxojo_collect()# How many people were booked into the jail each year?ocdc_data |>count(year =floor_date(book_date, "years")) |>ggplot(aes(x = year, y = n)) +geom_col()
The main limitation: we had to pull reports by hand from their clunky, limiting system to get to the data.
I’ve always wanted to overhaul things, but getting Jailtracker (the contractor managing the jail’s data system) to help us set up a direct connection was difficult.
After a very long, often one-sided email thread, I finally broke through late last year, and now version 2.0 of the CJAC pipeline is nearing completion!
1.8 Objective & Motivation
Objective:
Create a new, fully automated pipeline with richer data, and dashboard to go with it
Motivation:
Identify problems and enable better decision-making on a daily basis in the jail & court system
Open up data to public oversight
Help guide CJAC’s future in Oklahoma County
2 The data
2.1 The data: Jailtracker ➡ GCS
Jailtracker still wouldn’t give us direct access to the database itself, but they did offer to give us a daily export of the tables we needed via File Transfer Protocol (FTP).
To start, I began poring through their schema to determine which tables we’d need for our pipeline.
After a little back-and-forth, we eventually got the FTP server and the export set up. We use Google Cloud, so we set up our FTP server to dump the files into a GCS Bucket.
2.2 The data: Jailtracker ➡ GCS
2.3 The data: GCS ➡ Database
When new files hit the GCS Bucket each day, it activates an API I wrote with {plumber} in R and deployed via Google Cloud Run/Build.
This API then parses the data (which arrives in .XLM format), performs some cleaning and quality checks, then upserts the data to the proper place in our database.
The result is a mirrored image of the database tables we’re interested in, updated daily at around 12pm.
2.4 The data: GCS ➡ Database
Here’s my API for accomplishing all of this, in case you’re curious!
library(plumber)library(jsonlite)library(logger)library(googleCloudStorageR)library(gargle)library(dplyr)library(readr)library(lubridate)library(janitor)library(purrr)library(xml2)library(stringr)library(fs)library(snakecase)config <- config::get()Sys.setenv("OJO_HOST"= config$database$server)Sys.setenv("OJO_PORT"= config$database$port)Sys.setenv("OJO_DEFAULT_USER"= config$database$uid)Sys.setenv("OJO_DEFAULT_PASS"= config$database$pwd)Sys.setenv("OJO_SSL_MODE"="verify-ca")Sys.setenv("OJO_SSL_ROOT_CERT"= config$database$ssl.ca)Sys.setenv("OJO_SSL_CERT"= config$database$ssl.cert)Sys.setenv("OJO_SSL_KEY"= config$database$ssl.key)# Get a token that has permission to interact with GCS and authorizescope <-c("https://www.googleapis.com/auth/cloud-platform")token <- gargle::token_fetch(scopes = scope)gcs_auth(token = token)#* Catch GCP Eventarc Events#* @post /function(req) {# Store the response so we can return it after all other logic res <-tryCatch( {# Extract structured fields from eventarc json body extracted_data <-list(id = req$body$id,bucket = req$body$bucket,name = req$body$name,generation = req$body$generation,content_type = req$body$contentType,created_at = req$body$timeCreated ) logger::log_info(paste("Request Received: Eventarc JSON payload: {extracted_data$name}"))# In-memory XML object xml_object <-get_jailtracker_xml(extracted_data$name)# List containing `file_type` and `timestamp` xml_object_metadata <-parse_jailtracker_xml_file_name(extracted_data$name)# Pass args to read_ function to get parsed data in nice tibble format parsed_data <-read_ftp_xml(raw_xml = xml_object,table_name = xml_object_metadata$file_type)# Pass parsed tibble to upsert functionif (!is.null(xml_object)){upsert_parsed_data_to_db(parsed_data = parsed_data,file_name = xml_object_metadata$file_type )# If !exists(xml_data) then don't even try to run the upsert function } else { logger::log_error("Failed to read XML data from GCS!") res <-list(message ="Failed to read XML data from GCS!",error ="Failed to read XML data from GCS" ) }# Attach needed fields to the response res <-list(message ="Successfully parsed the Eventarc JSON payload",payload = extracted_data )# Log our response logger::log_info("{res}")# This returns to the `res` variable NOT the endpoint handlerreturn(res) },error = \(e) {# If we fail to parse just log it res <-list(message ="Failed to parse JSON payload from Eventarc",error = e$message ) logger::log_error("{res}")# This returns to the `res` variable NOT the endpoint handlerreturn(res) } )# Clear out old or unexpected files from the bucket, or log failures# prune_bucket() # removed for now# Return the response we made in the tryCatchreturn(res)}prune_bucket <-function(bucket ="jailtracker-sftp", max_age =30) { threshold <- lubridate::now() - lubridate::days(max_age)# Get all objects that are older than max age or not xml files all_objects <-gcs_list_objects(bucket = bucket,detail ="more" ) |> dplyr::as_tibble() n_all_objects <-nrow(all_objects)if(n_all_objects >0){ objects <- all_objects |> dplyr::filter( (updated < threshold) | (!stringr::str_detect(contentType, "/xml")) ) } else { objects <- tibble::tibble() } n_objects <-nrow(objects)# Only delete if we get a match, else skip to logif (n_objects >0) { logger::log_debug("Cleanup: Deleting {n_objects} object(s) from bucket")# Iterate over the names, trying to delete each one objects$name |> purrr::walk( \(x) {# Delete object returns a boolean, true if successful success <-gcs_delete_object(object_name = x,bucket = bucket )if (success) { logger::log_debug("Deleted object: {x}") } else { logger::log_error("Failed to delete object: {x}") } } ) } else { logger::log_info("Cleanup: No objects older than {max_age} day(s) or of unexpected content type") }}get_jailtracker_xml <-function(name, bucket ="jailtracker-sftp") { temp_path <-paste0("~/temp-", name, ".xml")# Save the temp file path as the saveToDisk argumentgcs_get_object(object_name = name,bucket = bucket,saveToDisk = temp_path, # Save the xml file to the diskparseObject =FALSE,overwrite =TRUE )# Check for successif(!fs::file_exists(temp_path)){return(NULL) logger::log_error("Failed to get object from GCS and save as temp file.") }# Use read_lines on the temp file path x <- readr::read_lines(temp_path)# If all has worked, delete the temp file and return the readr parsed objectif(fs::file_exists(temp_path)){ fs::file_delete(temp_path) logger::log_debug(paste("Deleted temp file", temp_path, "after reading.")) }return(x)}parse_jailtracker_xml_file_name <-function(name) { parsed <- stringr::str_match(name, "(?<file>\\w*)_(?<timestamp>\\d*).xml$") res <-list(file_type = parsed[2],timestamp = lubridate::ymd_hms(parsed[3], tz ="America/Chicago") )return(res)}upsert_parsed_data_to_db <-function(parsed_data, file_name) {# Pick the right table to upsert to based on file name db_table <- dplyr::case_when( file_name =="InmateArrestInfo"~"arrest_info", file_name =="FacilityArrestingAgencies"~"arresting_agencies", file_name =="InmateBookingInfo"~"booking_info", file_name =="InmateCriminalCharges"~"charges", file_name =="InmateCriminalChargesHistory"~"charges_history", file_name =="FacilityLayoutCells"~"layout_cells", file_name =="FacilityLayoutDivisions"~"layout_divisions", file_name =="ListCategories"~"list_categories", file_name =="ListItems"~"list_items",TRUE~NA_character_ )# Pick the right primary key based on the file name primary_key <- dplyr::case_when( file_name =="InmateArrestInfo"~"arrest_no", file_name =="FacilityArrestingAgencies"~"agency_id", file_name =="InmateBookingInfo"~"arrest_no", file_name =="InmateCriminalCharges"~"charge_id", file_name =="InmateCriminalChargesHistory"~"history_id", file_name =="FacilityLayoutCells"~"cell_id", file_name =="FacilityLayoutDivisions"~"division_id", file_name =="ListCategories"~"category_id", file_name =="ListItems"~"item_id",TRUE~NA_character_ ) logger::log_debug(paste("Attempting to upsert", nrow(parsed_data), "rows to", db_table, "table using", primary_key,"primary key...") )if(nrow(parsed_data) >0){ ojodb <- ojodb::ojo_connect()# Upsert into the proper table w/ proper primary key dplyr::rows_upsert(x = ojodb::ojo_tbl(schema ="ocdc_new", # Always the same schematable = db_table,.con = ojodb ),y = parsed_data,by = primary_key,copy =TRUE,in_place =TRUE ) logger::log_success(paste("Successfully upserted",format(nrow(parsed_data), big.mark =","),"rows to", db_table,"table!"))# If we just upserted the booking or arrest info...if(db_table %in%c("arrest_info", "booking_info")){# Check that the arrest and booking data from today have both been upserted... latest_arrest_data <- ojodb::ojo_tbl(schema ="ocdc_new",table ="arrest_info",.con = ojodb) |> dplyr::slice_max(order_by ="last_updated_time",n =1,with_ties =FALSE,na_rm =TRUE) |> dplyr::pull("last_updated_time") logger::log_debug(paste("Latest arrest data update:", latest_arrest_data)) latest_booking_data <- ojodb::ojo_tbl(schema ="ocdc_new",table ="booking_info",.con = ojodb) |> dplyr::slice_max(order_by ="last_updated_time",n =1,with_ties =FALSE,na_rm =TRUE) |> dplyr::pull("last_updated_time") logger::log_debug(paste("Latest booking data update:", latest_booking_data))# ...then if so, run the daily pop calculation for today and add that to the db.if(lubridate::floor_date(latest_arrest_data, "day") == lubridate::today() & lubridate::floor_date(latest_booking_data, "day") == lubridate::today()) { logger::log_info("Latest arrest and booking data are from today! Beginning daily pop table update...")update_daily_pop_table() } else { logger::log_info("Latest arrest or booking data is not from today, skipping daily pop update...") } } } else { logger::log_info("File contains no data, skipping...") }}read_ftp_xml <-function(raw_xml, table_name) {# Preprocess to escape invalid ampersands and correct common tag mismatches clean_xml <- stringr::str_replace_all( raw_xml,c("&(?!amp;|lt;|gt;|apos;|quot;)"="&", # Match unescaped ampersands"<BokingInfo>"="<BookingInfo>","</BokingInfo>"="</BookingInfo>" ) )# Convert the cleaned XML content to a single string clean_xml_string <-paste(clean_xml, collapse ="\n")# Find the file we're looking for xml_title <- dplyr::case_when( table_name =="InmateArrestInfo"~"//ArrestInfo", table_name =="FacilityArrestingAgencies"~"//Agency", table_name =="InmateBookingInfo"~"//BookingInfo", table_name =="InmateCriminalCharges"~"//Charge", table_name =="InmateCriminalChargesHistory"~"//ChargeHistory", table_name =="FacilityLayoutCells"~"//Cell", table_name =="FacilityLayoutDivisions"~"//Division", table_name =="ListCategories"~"//Category", table_name =="ListItems"~"//Item" )# Read the cleaned XML from the stringtryCatch({ logger::log_debug("Starting the XML string -> table cleaning process") result <- xml2::read_xml(clean_xml_string) |> xml2::xml_find_all(xml_title) |> purrr::map(\(x) { xml2::xml_children(x) |> purrr::set_names(x |> xml2::xml_children() |> xml2::xml_name()) |> purrr::map_chr(xml2::xml_text) }) |> dplyr::bind_rows() |> dplyr::rename_with(snakecase::to_snake_case) |># Empty values to NA dplyr::mutate( dplyr::across(tidyselect::everything(), \(x) ifelse(stringr::str_detect(x, "^\\s*$"), NA_character_, x) ) ) |># Replace "01-01-1900" dates with NA dplyr::mutate( dplyr::across(tidyselect::matches("date|time"), \(x) ifelse(stringr::str_detect(x, "1/1/1900|1900-01-01"), NA_character_, x) ) ) |># Fix dates dplyr::mutate( dplyr::across(tidyselect::matches("date|time"), \(x) lubridate::parse_date_time(x,orders =c("%m/%d/%Y %I:%M:%S %p", "%m/%d/%Y %H:%M:%S", "mdy"),tz ="UTC") ) ) |> janitor::remove_empty("rows") logger::log_debug("XML string -> table cleaning process complete!") }, error =function(e) {# If there are any parsing errors at all I don't want it to upsert anything to the dbmessage("XML parsing error: ", e$message) result <-NULL }) logger::log_debug("Trying the date / time check...")# Check to make sure the date parsing didn't failif (!is.null(result) &any(stringr::str_detect(names(result), "date|time"))) { all_datetimes <- result |> dplyr::select(tidyselect::matches("date|time")) |>sapply(lubridate::is.POSIXct)if (sum(all_datetimes) !=length(all_datetimes)) { result <-NULLstop("XML parsing error: 'date' or 'time' column is not of type POSIXct!") } } logger::log_debug(paste("Check passed, returning", nrow(result), "rows of data."))return(result)}# Update the daily_pop table based on the arrest_info and booking_info tablesupdate_daily_pop_table <-function(){# Get current population + booking data for demographics in_custody <- ojodb::ojo_tbl(schema ="ocdc_new",table ="arrest_info") |> dplyr::filter(is.na(final_release_date_time), # Current population only record_deleted !="True") |> dplyr::left_join( ojodb::ojo_tbl(schema ="ocdc_new",table ="booking_info") |> dplyr::filter(record_deleted !="True", record_sealed !="True"),by =c("arrest_no"="arrest_no"),suffix =c(".arrest", ".booking") ) |> dplyr::collect()# Count people in each demographic category daily_pop_today <- in_custody |> dplyr::summarise(day = lubridate::today() + lubridate::hours(12),total_pop = dplyr::n(),race_native_pi_pop =sum(race_id ==9481),race_black_pop =sum(race_id ==8853),race_hispanic_pop =sum(race_id ==8855),race_native_american_pop =sum(race_id ==9130),race_white_pop =sum(race_id ==8854),race_asian_pop =sum(race_id ==9482),race_mid_east_pop =sum(race_id ==9432),race_unknown_pop =sum(race_id ==13825),gender_f_pop =sum(sex =="F"),gender_m_pop =sum(sex =="M"),gender_o_pop =sum(sex !="F"& sex !="M"),sec_max_pop =sum(security_level_id ==8084),sec_med_pop =sum(security_level_id ==8083),sec_min_pop =sum(security_level_id ==8082),sec_other_pop =sum(is.na(security_level_id) |!security_level_id %in%c(8082, 8083, 8084)) ) logger::log_debug("Upserting population totals now: {jsonlite::toJSON(daily_pop_today, pretty = TRUE)}") dplyr::rows_upsert(x = ojodb::ojo_tbl(schema ="ocdc_new",table ="daily_pop" ),y = daily_pop_today,by ="day",copy =TRUE,in_place =TRUE ) logger::log_success(paste("Successfully updated the daily_pop table! Today's pop:", daily_pop_today$total_pop[1] |>format(big.mark =",")))}
2.5 The data: The results
So what data did we actually get?
Our new pipeline covers everything we could access via the old one, now on a daily basis and with no manual labor involved.
Plus, it includes new information we couldn’t reliably access through Jailtracker’s clunky user interface.
Bond amounts and types are one big example – one of CJAC’s priorities has been reducing the number of people being held solely for inability to pay bail. And now we can analyze and track that!
2.6 The data: The results
Our new OCDC schema has ten tables, including daily_pop, which is calculated during the API-triggered data cleaning process and tracks the jail’s daily population counts at noon.
new_tables <-ojo_list_tables(schema ="ocdc_new")$tabletable_summary <-lapply(new_tables, function(tbl_name) { tbl <-ojo_tbl(schema ="ocdc_new", table = tbl_name)data.frame(table = tbl_name,columns =colnames(tbl) |>length(), # paste(colnames(tbl), collapse = ", "),n_rows = tbl |>summarise(n =n()) |>collect() |>pull(n),stringsAsFactors =FALSE )})summary <-do.call(rbind, table_summary) |>arrange(desc(n_rows))# Let's make a fancy table with a styling package I wrote, then take a look.library(ojothemes)summary_table <-gt_okpi(summary, title ="New CJAC Data Pipeline Summary") |>ojo_gt_captions(analyst_name ="Andrew Bell", source ="ocdc")gtsave(summary_table, here("media", "new-pipeline-summary.png"))
2.7 The data: The results
We now have all this data – including the new variables I’ve been after for years – spanning back decades and updating itself on a daily basis!
2.8 The data: The dashboard
I’ve also been able to break ground on the new dashboard:
2.9 The data: The dashboard
I’ve also been able to break ground on the new dashboard:
2.10 The data: The dashboard
It’s not deployed anywhere yet, so I can’t share it with y’all directly – getting a test deployment up is next on my to-do list – but I can show you my in-development version now.
(This is Andrew’s cue to do that now)
3 Final thoughts
3.1 Final thoughts: CJAC 2.0
This project involved many of the same tasks I’d be doing in the RSA position:
Communicating with clients / jail officials / contractors
Also working around the limitations they impose
Data engineering tasks
Proprietary jail software ➡ GCloud ➡ R / SQL ➡ Postgres ➡ Data products
Also involved lots of cleaning, wrangling of messy jail data
Data analysis / visualization tasks
The dashboard, the automated reporting, ad-hoc analyses as needed
This will also include presenting the new dashboard / reports to CJAC and their new Director at an upcoming meeting.
3.2 Final thoughts: Thanks for your time!
I know I covered a lot of ground, so I would be happy to answer any questions y’all have or elaborate on any part of this project!
It’s still a work-in-progress, but I think it demonstrates the depth and breadth of my skills and how they would apply to the RSA position with CSG.