CSG Justice Center

RSA Interview

Project Presentation

Andrew Bell

Oklahoma City, OK

1 - The project:

My work with CJAC and the Oklahoma County Detention Center

1.1 The project: What is CJAC?

OK Policy provides data services to the Oklahoma County Criminal Justice Advisory Council (CJAC). I’ve been in charge of fulfilling that contract for 4+ years.

  • CJAC is a community oversight task force founded to research ways to improve the overcrowded, dangerous conditions at OKC’s local jail, the Oklahoma County Detention Center (OCDC).
  • They advise the Jail Trust, which was created to take control of OCDC away from the Sheriff when conditions deteriorated to the point of crisis in 2019.

1.2 The project: What is CJAC?

1.3 The project: My role with CJAC

My role: CJAC’s go-to data person

  • I developed and maintain a Shiny dashboard for jail data.
  • I also provide custom reports, datasets, answer research questions, etc. for CJAC’s Director / members.
    • Including: Oklahoma County’s DA, members of our Public Defender’s office, court officials, etc.
    • I’ve also been able to provide this data to others beyond CJAC, like the OKC Diversion Hub and Homeless Alliance.

1.4 The project: CJAC Version 2.0

One of my first tasks when I started in 2020 was building our current CJAC data pipeline / dashboard.

  • We store everything in our own database, so we can access it easily with our R package.
  • My long, ongoing, nearly-complete project to overhaul and improve this pipeline is what I’m going to show you today!
    • First, I’ll show you a bit more of what things looked like originally.

1.5 The project: The CJAC Pipeline

# This is what our R package for accessing the data looks like in action
library(ojodb)
ocdc_data <- ojo_tbl(schema = "ocdc", table = "arrest") |>
  select(-c(starts_with("def_"), fbi_num, state_id)) |>
  filter(book_date >= "2016-01-01") |> # Very SQL-like syntax
  ojo_collect()

# How many people were booked into the jail each year?
ocdc_data |>
  count(year = floor_date(book_date, "years")) |>
  ggplot(aes(x = year, y = n)) +
  geom_col()

1.6 Old Dashboard

You can see Version 1.0 of the dashboard here.

1.7 The project: CJAC Version 2.0

The main limitation: we had to pull reports by hand from their clunky, limiting system to get to the data.

  • I’ve always wanted to overhaul things, but getting Jailtracker (the contractor managing the jail’s data system) to help us set up a direct connection was difficult.
    • After a very long, often one-sided email thread, I finally broke through late last year, and now version 2.0 of the CJAC pipeline is nearing completion!

1.8 Objective & Motivation

Objective:

  • Create a new, fully automated pipeline with richer data, and dashboard to go with it

Motivation:

  • Identify problems and enable better decision-making on a daily basis in the jail & court system
  • Open up data to public oversight
  • Help guide CJAC’s future in Oklahoma County

2 The data

2.1 The data: Jailtracker ➡ GCS

Jailtracker still wouldn’t give us direct access to the database itself, but they did offer to give us a daily export of the tables we needed via File Transfer Protocol (FTP).

  • To start, I began poring through their schema to determine which tables we’d need for our pipeline.
  • After a little back-and-forth, we eventually got the FTP server and the export set up. We use Google Cloud, so we set up our FTP server to dump the files into a GCS Bucket.

2.2 The data: Jailtracker ➡ GCS

2.3 The data: GCS ➡ Database

When new files hit the GCS Bucket each day, it activates an API I wrote with {plumber} in R and deployed via Google Cloud Run/Build.

  • This API then parses the data (which arrives in .XLM format), performs some cleaning and quality checks, then upserts the data to the proper place in our database.
  • The result is a mirrored image of the database tables we’re interested in, updated daily at around 12pm.

2.4 The data: GCS ➡ Database

Here’s my API for accomplishing all of this, in case you’re curious!

library(plumber)
library(jsonlite)
library(logger)
library(googleCloudStorageR)
library(gargle)
library(dplyr)
library(readr)
library(lubridate)
library(janitor)
library(purrr)
library(xml2)
library(stringr)
library(fs)
library(snakecase)

config <- config::get()

Sys.setenv("OJO_HOST" = config$database$server)
Sys.setenv("OJO_PORT" = config$database$port)
Sys.setenv("OJO_DEFAULT_USER" = config$database$uid)
Sys.setenv("OJO_DEFAULT_PASS" = config$database$pwd)
Sys.setenv("OJO_SSL_MODE" = "verify-ca")
Sys.setenv("OJO_SSL_ROOT_CERT" = config$database$ssl.ca)
Sys.setenv("OJO_SSL_CERT" = config$database$ssl.cert)
Sys.setenv("OJO_SSL_KEY" = config$database$ssl.key)

# Get a token that has permission to interact with GCS and authorize
scope <- c("https://www.googleapis.com/auth/cloud-platform")
token <- gargle::token_fetch(scopes = scope)

gcs_auth(token = token)

#* Catch GCP Eventarc Events
#* @post /
function(req) {
  # Store the response so we can return it after all other logic
  res <- tryCatch(
    {
      # Extract structured fields from eventarc json body
      extracted_data <- list(
        id = req$body$id,
        bucket = req$body$bucket,
        name = req$body$name,
        generation = req$body$generation,
        content_type = req$body$contentType,
        created_at = req$body$timeCreated
      )

      logger::log_info(paste("Request Received: Eventarc JSON payload: {extracted_data$name}"))

      # In-memory XML object
      xml_object <- get_jailtracker_xml(extracted_data$name)

      # List containing `file_type` and `timestamp`
      xml_object_metadata <- parse_jailtracker_xml_file_name(extracted_data$name)

      # Pass args to read_ function to get parsed data in nice tibble format
      parsed_data <- read_ftp_xml(raw_xml = xml_object,
                                  table_name = xml_object_metadata$file_type)

      # Pass parsed tibble to upsert function
      if (!is.null(xml_object)){
        upsert_parsed_data_to_db(
          parsed_data = parsed_data,
          file_name = xml_object_metadata$file_type
        )
        # If !exists(xml_data) then don't even try to run the upsert function
      } else {
        logger::log_error("Failed to read XML data from GCS!")
        res <- list(
          message = "Failed to read XML data from GCS!",
          error = "Failed to read XML data from GCS"
        )
      }

      # Attach needed fields to the response
      res <- list(
        message = "Successfully parsed the Eventarc JSON payload",
        payload = extracted_data
      )

      # Log our response
      logger::log_info("{res}")

      # This returns to the `res` variable NOT the endpoint handler
      return(res)
    },
    error = \(e) {
      # If we fail to parse just log it
      res <- list(
        message = "Failed to parse JSON payload from Eventarc",
        error = e$message
      )

      logger::log_error("{res}")

      # This returns to the `res` variable NOT the endpoint handler
      return(res)
    }
  )

  # Clear out old or unexpected files from the bucket, or log failures
  # prune_bucket() # removed for now

  # Return the response we made in the tryCatch
  return(res)
}

prune_bucket <- function(bucket = "jailtracker-sftp", max_age = 30) {
  threshold <- lubridate::now() - lubridate::days(max_age)

  # Get all objects that are older than max age or not xml files
  all_objects <- gcs_list_objects(
    bucket = bucket,
    detail = "more"
  ) |>
    dplyr::as_tibble()

  n_all_objects <- nrow(all_objects)

  if(n_all_objects > 0){
    objects <- all_objects |>
      dplyr::filter(
        (updated < threshold) | (!stringr::str_detect(contentType, "/xml"))
      )
  } else {
    objects <- tibble::tibble()
  }

  n_objects <- nrow(objects)

  # Only delete if we get a match, else skip to log
  if (n_objects > 0) {
    logger::log_debug("Cleanup: Deleting {n_objects} object(s) from bucket")

    # Iterate over the names, trying to delete each one
    objects$name |>
      purrr::walk(
        \(x) {
          # Delete object returns a boolean, true if successful
          success <- gcs_delete_object(
            object_name = x,
            bucket = bucket
          )

          if (success) {
            logger::log_debug("Deleted object: {x}")
          } else {
            logger::log_error("Failed to delete object: {x}")
          }
        }
      )
  } else {
    logger::log_info("Cleanup: No objects older than {max_age} day(s) or of unexpected content type")
  }
}

get_jailtracker_xml <- function(name, bucket = "jailtracker-sftp") {

  temp_path <- paste0("~/temp-", name, ".xml")

  # Save the temp file path as the saveToDisk argument
  gcs_get_object(
    object_name = name,
    bucket = bucket,
    saveToDisk = temp_path, # Save the xml file to the disk
    parseObject = FALSE,
    overwrite = TRUE
  )

  # Check for success
  if(!fs::file_exists(temp_path)){
    return(NULL)
    logger::log_error("Failed to get object from GCS and save as temp file.")
  }

  # Use read_lines on the temp file path
  x <- readr::read_lines(temp_path)

  # If all has worked, delete the temp file and return the readr parsed object
  if(fs::file_exists(temp_path)){
    fs::file_delete(temp_path)
    logger::log_debug(paste("Deleted temp file", temp_path, "after reading."))
  }

  return(x)

}

parse_jailtracker_xml_file_name <- function(name) {
  parsed <- stringr::str_match(name, "(?<file>\\w*)_(?<timestamp>\\d*).xml$")

  res <- list(
    file_type = parsed[2],
    timestamp = lubridate::ymd_hms(parsed[3], tz = "America/Chicago")
  )

  return(res)
}


upsert_parsed_data_to_db <- function(parsed_data, file_name) {

  # Pick the right table to upsert to based on file name
  db_table <- dplyr::case_when(
    file_name == "InmateArrestInfo" ~ "arrest_info",
    file_name == "FacilityArrestingAgencies" ~ "arresting_agencies",
    file_name == "InmateBookingInfo" ~ "booking_info",
    file_name == "InmateCriminalCharges" ~ "charges",
    file_name == "InmateCriminalChargesHistory" ~ "charges_history",
    file_name == "FacilityLayoutCells" ~ "layout_cells",
    file_name == "FacilityLayoutDivisions" ~ "layout_divisions",
    file_name == "ListCategories" ~ "list_categories",
    file_name == "ListItems" ~ "list_items",
    TRUE ~ NA_character_
  )

  # Pick the right primary key based on the file name
  primary_key <- dplyr::case_when(
    file_name == "InmateArrestInfo" ~ "arrest_no",
    file_name == "FacilityArrestingAgencies" ~ "agency_id",
    file_name == "InmateBookingInfo" ~ "arrest_no",
    file_name == "InmateCriminalCharges" ~ "charge_id",
    file_name == "InmateCriminalChargesHistory" ~ "history_id",
    file_name == "FacilityLayoutCells" ~ "cell_id",
    file_name == "FacilityLayoutDivisions" ~ "division_id",
    file_name == "ListCategories" ~ "category_id",
    file_name == "ListItems" ~ "item_id",
    TRUE ~ NA_character_
  )

  logger::log_debug(
    paste("Attempting to upsert", nrow(parsed_data), "rows to",
          db_table, "table using", primary_key,
          "primary key...")
  )

  if(nrow(parsed_data) > 0){

    ojodb <- ojodb::ojo_connect()
    # Upsert into the proper table w/ proper primary key
    dplyr::rows_upsert(
      x = ojodb::ojo_tbl(
        schema = "ocdc_new", # Always the same schema
        table = db_table,
        .con = ojodb
      ),
      y = parsed_data,
      by = primary_key,
      copy = TRUE,
      in_place = TRUE
    )
    logger::log_success(paste("Successfully upserted",
                              format(nrow(parsed_data), big.mark = ","),
                              "rows to", db_table,
                              "table!"))

    # If we just upserted the booking or arrest info...
    if(db_table %in% c("arrest_info", "booking_info")){
      # Check that the arrest and booking data from today have both been upserted...
      latest_arrest_data <- ojodb::ojo_tbl(schema = "ocdc_new",
                                           table = "arrest_info",
                                           .con = ojodb) |>
        dplyr::slice_max(order_by = "last_updated_time",
                         n = 1,
                         with_ties = FALSE,
                         na_rm = TRUE) |>
        dplyr::pull("last_updated_time")

      logger::log_debug(paste("Latest arrest data update:", latest_arrest_data))

      latest_booking_data <- ojodb::ojo_tbl(schema = "ocdc_new",
                                            table = "booking_info",
                                            .con = ojodb) |>
        dplyr::slice_max(order_by = "last_updated_time",
                         n = 1,
                         with_ties = FALSE,
                         na_rm = TRUE) |>
        dplyr::pull("last_updated_time")

      logger::log_debug(paste("Latest booking data update:", latest_booking_data))

      # ...then if so, run the daily pop calculation for today and add that to the db.
      if(lubridate::floor_date(latest_arrest_data, "day") == lubridate::today() &
         lubridate::floor_date(latest_booking_data, "day") == lubridate::today()) {

        logger::log_info("Latest arrest and booking data are from today! Beginning daily pop table update...")

        update_daily_pop_table()

      } else {
        logger::log_info("Latest arrest or booking data is not from today, skipping daily pop update...")
      }

    }
  } else {
    logger::log_info("File contains no data, skipping...")
  }
}

read_ftp_xml <- function(raw_xml, table_name) {

  # Preprocess to escape invalid ampersands and correct common tag mismatches
  clean_xml <- stringr::str_replace_all(
    raw_xml,
    c(
      "&(?!amp;|lt;|gt;|apos;|quot;)" = "&amp;", # Match unescaped ampersands
      "<BokingInfo>" = "<BookingInfo>",
      "</BokingInfo>" = "</BookingInfo>"
    )
  )

  # Convert the cleaned XML content to a single string
  clean_xml_string <- paste(clean_xml, collapse = "\n")

  # Find the file we're looking for
  xml_title <- dplyr::case_when(
    table_name == "InmateArrestInfo" ~ "//ArrestInfo",
    table_name == "FacilityArrestingAgencies" ~ "//Agency",
    table_name == "InmateBookingInfo" ~ "//BookingInfo",
    table_name == "InmateCriminalCharges" ~ "//Charge",
    table_name == "InmateCriminalChargesHistory" ~ "//ChargeHistory",
    table_name == "FacilityLayoutCells" ~ "//Cell",
    table_name == "FacilityLayoutDivisions" ~ "//Division",
    table_name == "ListCategories" ~ "//Category",
    table_name == "ListItems" ~ "//Item"
  )

  # Read the cleaned XML from the string
  tryCatch({

    logger::log_debug("Starting the XML string -> table cleaning process")

    result <- xml2::read_xml(clean_xml_string) |>
      xml2::xml_find_all(xml_title) |>
      purrr::map(\(x) {
        xml2::xml_children(x) |>
          purrr::set_names(x |> xml2::xml_children() |> xml2::xml_name()) |>
          purrr::map_chr(xml2::xml_text)
      }) |>
      dplyr::bind_rows() |>
      dplyr::rename_with(snakecase::to_snake_case) |>
      # Empty values to NA
      dplyr::mutate(
        dplyr::across(tidyselect::everything(),
                      \(x) ifelse(stringr::str_detect(x, "^\\s*$"), NA_character_, x)
        )
      ) |>
      # Replace "01-01-1900" dates with NA
      dplyr::mutate(
        dplyr::across(tidyselect::matches("date|time"),
                      \(x) ifelse(stringr::str_detect(x, "1/1/1900|1900-01-01"), NA_character_, x)
        )
      ) |>
      # Fix dates
      dplyr::mutate(
        dplyr::across(tidyselect::matches("date|time"),
                      \(x) lubridate::parse_date_time(x,
                                                      orders = c("%m/%d/%Y %I:%M:%S %p", "%m/%d/%Y %H:%M:%S", "mdy"),
                                                      tz = "UTC")
        )
      ) |>
      janitor::remove_empty("rows")

    logger::log_debug("XML string -> table cleaning process complete!")

  }, error = function(e) {
    # If there are any parsing errors at all I don't want it to upsert anything to the db
    message("XML parsing error: ", e$message)
    result <- NULL
  })

  logger::log_debug("Trying the date / time check...")

  # Check to make sure the date parsing didn't fail
  if (!is.null(result) & any(stringr::str_detect(names(result), "date|time"))) {
    all_datetimes <- result |>
      dplyr::select(tidyselect::matches("date|time")) |>
      sapply(lubridate::is.POSIXct)
    if (sum(all_datetimes) != length(all_datetimes)) {
      result <- NULL
      stop("XML parsing error: 'date' or 'time' column is not of type POSIXct!")
    }
  }

  logger::log_debug(paste("Check passed, returning", nrow(result), "rows of data."))

  return(result)
}

# Update the daily_pop table based on the arrest_info and booking_info tables
update_daily_pop_table <- function(){

  # Get current population + booking data for demographics
  in_custody <- ojodb::ojo_tbl(schema = "ocdc_new",
                               table = "arrest_info") |>
    dplyr::filter(is.na(final_release_date_time), # Current population only
                  record_deleted != "True") |>
    dplyr::left_join(
      ojodb::ojo_tbl(schema = "ocdc_new",
                     table = "booking_info") |>
        dplyr::filter(record_deleted != "True",
                      record_sealed != "True"),
      by = c("arrest_no" = "arrest_no"),
      suffix = c(".arrest", ".booking")
    ) |>
    dplyr::collect()

  # Count people in each demographic category
  daily_pop_today <- in_custody |>
    dplyr::summarise(
      day = lubridate::today() + lubridate::hours(12),
      total_pop = dplyr::n(),
      race_native_pi_pop = sum(race_id == 9481),
      race_black_pop = sum(race_id == 8853),
      race_hispanic_pop = sum(race_id == 8855),
      race_native_american_pop = sum(race_id == 9130),
      race_white_pop = sum(race_id == 8854),
      race_asian_pop = sum(race_id == 9482),
      race_mid_east_pop = sum(race_id == 9432),
      race_unknown_pop = sum(race_id == 13825),
      gender_f_pop = sum(sex == "F"),
      gender_m_pop = sum(sex == "M"),
      gender_o_pop = sum(sex != "F" & sex != "M"),
      sec_max_pop = sum(security_level_id == 8084),
      sec_med_pop = sum(security_level_id == 8083),
      sec_min_pop = sum(security_level_id == 8082),
      sec_other_pop = sum(is.na(security_level_id) | !security_level_id %in% c(8082, 8083, 8084))
    )

  logger::log_debug("Upserting population totals now: {jsonlite::toJSON(daily_pop_today, pretty = TRUE)}")

  dplyr::rows_upsert(
    x = ojodb::ojo_tbl(
      schema = "ocdc_new",
      table = "daily_pop"
    ),
    y = daily_pop_today,
    by = "day",
    copy = TRUE,
    in_place = TRUE
  )

  logger::log_success(paste("Successfully updated the daily_pop table! Today's pop:",
                            daily_pop_today$total_pop[1] |> format(big.mark = ",")))

}

2.5 The data: The results

So what data did we actually get?

  • Our new pipeline covers everything we could access via the old one, now on a daily basis and with no manual labor involved.
    • Plus, it includes new information we couldn’t reliably access through Jailtracker’s clunky user interface.
    • Bond amounts and types are one big example – one of CJAC’s priorities has been reducing the number of people being held solely for inability to pay bail. And now we can analyze and track that!

2.6 The data: The results

Our new OCDC schema has ten tables, including daily_pop, which is calculated during the API-triggered data cleaning process and tracks the jail’s daily population counts at noon.

new_tables <- ojo_list_tables(schema = "ocdc_new")$table
table_summary <- lapply(new_tables, function(tbl_name) {
  tbl <- ojo_tbl(schema = "ocdc_new", table = tbl_name)
  data.frame(
    table = tbl_name,
    columns = colnames(tbl) |> length(), # paste(colnames(tbl), collapse = ", "),
    n_rows = tbl |> summarise(n = n()) |> collect() |> pull(n),
    stringsAsFactors = FALSE
  )
})
summary <- do.call(rbind, table_summary) |> arrange(desc(n_rows))

# Let's make a fancy table with a styling package I wrote, then take a look.
library(ojothemes)
summary_table <- gt_okpi(summary, title = "New CJAC Data Pipeline Summary") |>
  ojo_gt_captions(analyst_name = "Andrew Bell", source = "ocdc")
gtsave(summary_table, here("media", "new-pipeline-summary.png"))

2.7 The data: The results

We now have all this data – including the new variables I’ve been after for years – spanning back decades and updating itself on a daily basis!

2.8 The data: The dashboard

I’ve also been able to break ground on the new dashboard:

2.9 The data: The dashboard

I’ve also been able to break ground on the new dashboard:

2.10 The data: The dashboard

It’s not deployed anywhere yet, so I can’t share it with y’all directly – getting a test deployment up is next on my to-do list – but I can show you my in-development version now.



(This is Andrew’s cue to do that now)

3 Final thoughts

3.1 Final thoughts: CJAC 2.0

This project involved many of the same tasks I’d be doing in the RSA position:

  • Communicating with clients / jail officials / contractors
    • Also working around the limitations they impose
  • Data engineering tasks
    • Proprietary jail software ➡ GCloud ➡ R / SQL ➡ Postgres ➡ Data products
    • Also involved lots of cleaning, wrangling of messy jail data
  • Data analysis / visualization tasks
    • The dashboard, the automated reporting, ad-hoc analyses as needed
    • This will also include presenting the new dashboard / reports to CJAC and their new Director at an upcoming meeting.

3.2 Final thoughts: Thanks for your time!

I know I covered a lot of ground, so I would be happy to answer any questions y’all have or elaborate on any part of this project!

  • It’s still a work-in-progress, but I think it demonstrates the depth and breadth of my skills and how they would apply to the RSA position with CSG.

Thanks for listening!

4 Contact Info

Andrew Bell

andrewjbell.ab@gmail.com

Oklahoma City, OK