Step 1: 载入所需的包

library(data.table)
library(dplyr)
library(rgdal)
library(rgeos)
library(FNN)
library(fastcluster)

Step 2: 读取数据

route <- fread("/home/public/data/BUS_ROUTE_DIC.csv")

gps <- fread("/home/public/data/GPS_DATA.csv")
# 读取gps数据
gps <- gps[lng > 113.766 & lng < 115.617 & lat > 22.45 & lat < 23.867, ]
# 深圳位于东经113°46'~114°37',北纬22°27'~22°52', 因此排除范围外的数据
sum(duplicated(gps[, .(bus_id, day, time)]))
# 判断是否有重复数据
gps <- unique(gps, by = c("bus_id", "day", "time"), fromFirst = TRUE)
# 删除重复数据

# gps总计57900167条数据

afc <- fread("/home/public/data/AFC_DATA.csv")
# 读取afc数据
sum(duplicated(afc[, .(card_id, day, time)]))
# 判断是否有重复数据
afc <- unique(afc, by = c("card_id", "day", "time"), fromFirst = TRUE)
# 删除重复数据
afc <- left_join(afc, route, by = "bus_id")
# 将公交线路字段加入到afc中

# afc总计20036579条数据

Step 3: 准备Task1数据,即计算afc中每条刷卡数据所对应的公交车位置

setkey(gps, bus_id, day, time)
# 设置gps的主键
setkey(afc, bus_id, day, time)
# 设置afc的主键

gps[, `:=`(timeUp, time)]
setnames(gps, c("lng", "lat"), c("lngUp", "latUp"))
afc <- gps[afc, roll = -Inf]
# 将最接近刷卡时间的公交车辆gps中的时间和位置加入到afc中(迟于或等于刷卡时间),分别为timeUp,
# lngUp, latUp字段

setnames(gps, c("lngUp", "latUp", "timeUp"), c("lngDown", "latDown", "timeDown"))
afc <- gps[afc, roll = Inf]
# 将最接近刷卡时间的公交车辆gps中的时间和位置加入到afc中(早于或等于刷卡时间),分别为timeDown,
# lngDown, latDown字段

afc[, `:=`(c("timeUp", "timeDown"), list(timeUp - time, time - timeDown))]
# 重新计算timeUp, timeDown字段为与刷卡时间的差值

setnames(gps, c("lngDown", "latDown"), c("lng", "lat"))
gps[, `:=`(timeDown, NULL)]
# 还原gps数据到初始状态,消除为链接gps时间和afc时间对gps数据所做的修改。

getGeo <- function(timeDown, lngDown, latDown, timeUp, lngUp, latUp) {
    if (!is.na(timeDown) & !is.na(timeUp)) {
        if (timeDown == 0) {
            lng <- lngDown
            lat <- latDown
            trust <- 0
            return(c(lng, lat, trust))
        } else {
            lng <- (timeUp * lngDown + timeDown * lngUp)/(timeUp + timeDown)
            lat <- (timeUp * latDown + timeDown * latUp)/(timeUp + timeDown)
            trust <- (timeUp + timeDown)/2
            return(c(lng, lat, trust))
        }
    }
    # 对应timeDown和timeUp均不为空值的情况(大多数情况)
    if (!is.na(timeDown) & is.na(timeUp)) {
        lng <- lngDown
        lat <- latDown
        trust <- timeDown^2
        return(c(lng, lat, trust))
    }
    # 对应timeDown不为空值但timeUp为空值的情况(没有迟于刷卡时间的公交车辆gps数据的情况)
    if (is.na(timeDown) & !is.na(timeUp)) {
        lng <- lngUp
        lat <- latUp
        trust <- timeUp^2
        return(c(lng, lat, trust))
    }
    # 对应timeDown为空值但timeUp不为空值的情况(没有早于刷卡时间的公交车辆gps数据的情况)
    lng <- NA
    lat <- NA
    trust <- NA
    return(c(lng, lat, trust))
    # 对应timeDown为空值但timeUp均为空值的情况
}
# 定义使用afc中每条刷卡数据的timeDown, lngDown, latDown, timeUp, lngUp,
# latUp字段计算该条刷卡数据所对应的公交车位置和置信度的方法

afcFull <- mapply(getGeo, afc$timeDown, afc$lngDown, afc$latDown, afc$timeUp, 
    afc$lngUp, afc$latUp)
afc[, `:=`(c("lng", "lat", "trust"), list(afcFull[1, ], afcFull[2, ], afcFull[3, 
    ]))]
# 将getGeo方法逐行应用于afc数据中,通过使用mapply能够避免使用for循环,能大幅提高程序运行速度

rm(afcFull)
# 删除中间数据以节省内存

Step 4: 通过afc中相邻刷卡时间的间隔初步判断可能的中途站和首末站

afc[, `:=`(timeLag, lag(time, 1L)), by = .(bus_id, day)]
# afc数据已按bus_id和day分组,并按time排序后,将上一条刷卡数据的时间作为下一条刷卡数据的timeLag字段

afc[, `:=`(c("stopDiff", "dirDiff"), list(ifelse(time - timeLag >= 90, 1, 0), 
    ifelse(time - timeLag >= 480, 1, 0)))]
# 如果上下条刷卡数据时间间隔大于90s,后者可能为下一个站点,标记stopDiff为1;
# 如果上下条刷卡数据时间间隔大于480s,后者可能为首末站点,标记dirDiff为1

afc[is.na(stopDiff), `:=`(stopDiff, 0)]
afc[, `:=`(stopIndex, cumsum(stopDiff)), by = .(bus_id, day)]
# 根据stopDiff字段生成stopIndex字段,初步得到每一条数据的站点编号

Step 5: 通过投影得到afc数据的投影坐标

afcSpatial <- afc[!is.na(trust), ]
# 排除afc中没有坐标的数据,得到新的数据afcSpatial,总计19030638条数据

afcSpatial <- afcSpatial[, `:=`(c("lngDown", "latDown", "timeDown", "lngUp", 
    "latUp", "timeUp"), list(NULL, NULL, NULL, NULL, NULL, NULL))]
# 删除afcSpatial中的lngDown、latDown等6个字段

afcSpatial <- data.frame(afcSpatial, x = afcSpatial$lng, y = afcSpatial$lat)
coordinates(afcSpatial) <- c("x", "y")
proj4string(afcSpatial) <- CRS("+init=epsg:4326 +proj=longlat +ellps=WGS84 + datum=WGS84")
afcSpatial <- spTransform(afcSpatial, CRS("+init=epsg:2414 +datum=WGS84"))
afcSpatial <- data.table(coordinates(afcSpatial), afcSpatial@data)
# 对afcSpatial进行投影得到投影坐标分别记为x、y字段

Step 6: 利用Step4中初步判断的首末站点,通过聚类得到首末站点的坐标

getClustSE <- function(routeStop) {
    routeStopClust <- hclust(dist(data.frame(x = routeStop[, x], y = routeStop[, 
        y])), method = "complete")
    routeStopClust <- cutree(routeStopClust, h = 300)
    return(routeStopClust)
}
# 定义利用Step4中初步判断的首末站点,按照300m的距离,进行聚类的方法

afcSE <- afcSpatial[dirDiff == 1, ]
# 从afcSpatial中分离出可能的首末站点,得到新的数据afcSE

afcSENew <- data.table()
for (i in 1:500) {
    afcSEOne <- afcSE[route_id == i, ]
    afcSEClust <- data.table(Clust = getClustSE(afcSEOne))
    afcSEOne <- cbind(afcSEOne, afcSEClust)
    afcSEClust[, `:=`(Count, .N), by = Clust]
    afcSEClust <- afcSEClust[, .(Count = mean(Count)), by = Clust]
    afcSEClust[, `:=`(Rank, rank(-Count, ties.method = "min"))]
    afcSENew <- rbind(afcSENew, merge(afcSEOne, afcSEClust, by = "Clust"))
}
# 按照线路进行循环,将afcSE数据逐线路聚类,得到afcSENew数据,afcSENew在afcSE基础上新增了3个字段,Clust为该站点按照300m进行聚类后所对应的类,Count为该站点所在的类中包含的站点总量,Rank为按照站点数量对该类的排名

afcSENew <- afcSENew[Rank < 3, .(lng = mean(lng), lat = mean(lat), x = mean(x), 
    y = mean(y)), by = .(route_id, Rank)]
# 将每一条线路中Rank排名前两位的类作为首末站点的类,并对这两个类的坐标进行平均之后得到首末站点的坐标
# 通过上述聚类可以排除在Step4中一些由于相邻刷卡间隔时间较长而被误认为首末站点的中途站点对首末站点坐标计算产生的干扰

Step 7: 利用Step6中得到的首末站点坐标,通过计算最短距离的方法找出首末站点的刷卡数据,进而标记所有刷卡数据的上下行方向

getSE <- function(Route_id, X, Y) {
    Near <- get.knnx(data.frame(X, Y), afcSENew[route_id == Route_id, list(x, 
        y)], k = 1)
    Dis <- min(Near[[2]])
    return(ifelse(Dis < 300, 1, 0))
}
# 定义在已知该条线路首末站点的坐标的基础上,根据距离判断是首末站点的刷数据的方法

dirIndex <- mapply(getSE, afcSpatial$route_id, afcSpatial$x, afcSpatial$y)
afcSE <- cbind(afcSpatial[, list(bus_id, day, time)], dirIindex)
afcSE <- afcSE[dirIndex == 1, ]
# 对afcSpatial数据逐条检验,如果该刷卡数据的位置离该条线路首末站点坐标的距离小于300m,则将其作为首末站点的刷卡数据,对afcSpatial数据进行筛选后得到仅包含首末站点的刷卡数据afcSE

setkey(afcSpatial, bus_id, day, time)
setkey(afcSE, bus_id, day, time)
afcSpatial <- afcSE[afcSpatial, roll = TRUE]
setnames(afcSE, "dirIndex", "dirIndexDel")
afcSpatial <- afcSE[afcSpatial, roll = -Inf]
afcSpatial[is.na(dirIndex), `:=`(dirIndex, ifelse(dirIndexDel == 1, 0, 1))]
afcSpatial <- afcSpatial[is.na(dirIndex), `:=`(dirIndex, sample(0:1, 1))]
afcSpatial[, `:=`(dirIndexDel, NULL)]
# 根据afcSE数据中记录的首末站点的刷卡数据,得到afcSpatial数据中所有刷卡数据的上下行方向记为dirIndex字段

rm(afcSE, afcSENew, afcSEOne, afcSEClust, Near, Dis, i)
# 删除中间数据以节省内存

Step 8: 通过聚类得到公交线路所有站点坐标

getClust <- function(routeStop) {
    routeStopClust <- hclust(dist(data.frame(x = routeStop[, x], y = routeStop[, 
        y])), method = "ward.D2")
    routeStopClust <- cutree(routeStopClust, h = 200)
    return(routeStopClust)
}
# 定义对Step4中初步得到的站点,按照200m的距离,进行聚类的方法

stopSpatial <- afcSpatial[, .(time = mean(time), span = max(time) - min(time), 
    num = .N, lat = mean(lat), lng = mean(lng), x = mean(x), y = mean(y), Trust = mean(trust)), 
    by = .(route_id, bus_id, day, dirIndex, stopIndex)]
# 按照Step4中初步判断的站点划分,对处于同一站点的afc刷卡数据的坐标进行平均后得到数据stopSpatial,用于stopSpatial数据要小于afcSpatial数据,使用stopSpatial数据进行聚类能大幅减少聚类所用的时间和计算距离矩阵产生的内存占用

stopNew <- data.table()
for (i in 1:500) {
    for (j in 1:2) {
        stopSpatialOne <- stopSpatial[route_id == i & dirIndex == j, ]
        stopSpatialClust <- getClust(stopSpatialOne)
        stopSpatialClust <- cbind(stopSpatialOne, Clust = stopSpatialClust)
        stopSpatialClust[, `:=`(Count, sum(num)), by = Clust]
        stopNew <- rbind(stopNew, stopSpatialClust)
    }
}
# 按照线路、上下行方向循环,进行站点聚类

afcSpatial <- left_join(afcSpatial, stopNew[, list(route_id, bus_id, day, dirIndex, 
    stopIndex, Trust, Clust, Count)], by = c("route_id", "bus_id", "day", "dirIndex", 
    "stopIndex"))
# 将站点聚类结果附加至afcSpatial数据中,新增3个字段,Clust为该条刷卡数据所在的站点类,Count为该站点类所包含的刷卡数据的总量,Trust为根据每一条刷卡数据位置的置信度进行平均后计算得到的站点类位置的平均置信度

stopNew <- stopNew %>% group_by(route_id, dirIndex, Clust) %>% summarize(x = mean(x), 
    y = mean(y), lng = mean(lng), lat = mean(lat), Trust = mean(Trust), Count = mean(Count))
# 将stopNew数据分线路、上下行方向、站点类进行平均后最终得到每条线路上下行方向所有站点的位置、置信度、刷卡数据总量
stopNew <- stopNew[Count > 18, ]
# 从中排除9天内刷卡总数不足18的站点,因为如果该“站点”平均每天刷卡上车人数不足两人,该站点可能是因为gps数据错误而产生

rm(stopSpatialOne, stopSpatialClust, i, j)
# 删除中间数据以节省内存

Step 9: 输出Task1结果

stopOut <- stopNew
stopOut[, `:=`(seq, 1:.N), by = .(route_id, dirIndex)]
stopOut <- data.frame(stop_id = 1:nrow(stopOut), route_id = stopOut$route_id, 
    direction = stopOut$dirIndex, seq = stopOut$seq, lng = stopOut$lng, lat = stopOut$lat)
write.table(stopOut, "RESULT_STOP_LIST.csv", sep = ",", row.names = FALSE, col.names = FALSE)

Step 10: 准备Task2数据,即形成每张公交卡每天的公交出行链

setkey(afcSpatial, card_id, day, time)
# 将afcSpatial的主键改为卡号、天、时间
afcSpatial[, `:=`(swipe, .N), by = .(card_id, day)]
# 在afcSpatial中生成swipe字段为每个卡号每天的刷卡次数

afcSpatial[swipe >= 2, `:=`(c("route_id_next", "Clust_next", "lng_next", "lat_next", 
    "x_next", "y_next"), list(c(lead(route_id, 1L)[-.N], .SD[1, route_id]), 
    c(lead(Clust, 1L)[-.N], .SD[1, Clust]), c(lead(lng, 1L)[-.N], .SD[1, lng]), 
    c(lead(lat, 1L)[-.N], .SD[1, lat]), c(lead(x, 1L)[-.N], .SD[1, x]), c(lead(y, 
        1L)[-.N], .SD[1, y]))), by = .(card_id, day)]
# 如果某卡号当天刷卡次数大于等于两次,将下一次刷卡的线路、站点、位置信息加入到上一条刷卡记录中,第一次刷卡的线路、站点、位置信息则加入到最后一次刷卡记录中

Step 11: 处理可以形成公交出行链的数据(寻找离下次出行最近的站点)

afcGood <- afcSpatial[!is.na(route_id_next), ]
# 从afcSpatial中筛选出可以形成公交出行链的数据为afcGood

getAlight <- function(Route_id, DirIndex, Route_id_next, X_next, Y_next) {
    Route <- stopNew[route_id == Route_id, dirIndex == DirIndex, list(x, y, 
        lng, lat)]
    Next <- data.frame(X_next, Y_next)
    Near <- get.knnx(Next, Route[, list(x, y)], k = 1)
    Dis <- min(Near[[2]])
    Index <- which.min(Near[[2]])
    return(c(Route[[Index, "lng"]], Route[[Index, "lat"]], Dis))
}
# 定义针对公交出行链数据计算下车站点的getAlight方法,即选择离下一次出行上车站点距离最近的该次出行所乘坐的公交线路的公交站点作为该次出行的下车站点,如该次出行为上行,则从上行站点中选择,反之亦然。

afcGoodFull <- mapply(getAlight, afcGood$route_id, afcGood$dirIndex, afcGood$route_id_next, 
    afcGood$x_next, afcGood$y_next)
afcGood[, `:=`(c("lng_end", "lat_end", "dis"), list(afcGoodFull[1, ], afcGoodFull[2, 
    ], afcGoodFull[3, ]))]
# 将getAlight方法逐行应用于afcGood数据中,得到每一条数据的下车站点位置及其到下一个上车站点位置的直线距离,通过使用mapply代替for循环,能大幅提高程序运行速度

rm(afcGoodFull)
# 删除中间数据以节省内存

Step 12: 处理无法形成公交出行链的数据(匹配其他日期的类似出行)

afcBad <- afcSpatial[is.na(route_id_next), ]
# 从afcSpatial中筛选出不能形成公交出行链的数据为afcBad

setkey(afcGood, card_id, route_id, time)
setkey(afcBad, card_id, route_id, time)
sum(duplicated(afcGood[, .(card_id, route_id, time)]))
afcGoodJoin <- unique(afcGood, by = c("card_id", "route_id", "time"), fromFirst = TRUE)
afcBad <- afcGoodJoin[, list(card_id, route_id, time, lng_end, lat_end, dis)][afcBad, 
    roll = "nearest"]
# 对于afcBad这一类出行,从afcGood数据中找类似的出行进行匹配,即afcBad数据中出行的下车站点为afcGood数据中同一乘客在其他日期乘坐相同的公交线路且出行时间与该次出行的出行时间最为接近的出行的下车站点。

rm(afcJoin)
# 删除中间数据以节省内存

Step 13: 处理无法形成公交出行链的数据(可能下车站点的随机抽样)

afcBadRemain <- afcBad[is.na(lng_end), ]
# 筛选出通过Step12无法找到类似的出行数据进行匹配的出行作为afcBadRemain数据

alightDis <- afcGood[, .(route_id, dirIndex, Clust, route_id_next, dirIndex_next, 
    Clust_next, lng_end, lat_end)]
alightDis <- alightDis[, sample_n(.SD, 10, replace = TRUE), by = .(route_id, 
    dirIndex, Clust)]
setkey(alightDis, route_id, dirIndex, Clust)
# 在afcGood数据中选择相关字段,并按照线路、上下行方向、站点类进行随机的抽样,作为alightDis数据,由于alightDis对afcGood数据有很好的代表性且数据量小于afcGood数据,因此能够减少下面对潜在下车站点进行随机抽样所花费的时间和占用的内存

assignDis <- function(Route_id, DirIndex, clust) {
    afcGoodOne <- alightDis[.(Route_id, DirIndex, clust), ]
    if (nrow(afcGoodOne) > 0) {
        afcGoodOne <- sample_n(afcGoodOne, size = 1)
        return(c(afcGoodOne$lng_end, afcGoodOne$lat_end))
    } else {
        return(c(NA, NA))
    }
}
# 定义assignDis函数,可以根据afcBadRemain中每一条记录的公交线路、上下行方向、上车站点,在alightDis满足同样公交线路、上下行方向、上车站点的数据中,随机选择一条出行数据的下车站点作为afcBadReamin数据的下车站点。这样保证了afcBadRemain数据中反映出的每一条公交线路的上下车客流分布矩阵与afcGood数据中反映的分布矩阵近似。

alightDisFull <- mapply(assignDis, afcBadRemain$route_id, afcBadRemain$dirIndex, 
    afcBadRemain$Clust)
afcBadRemain[, `:=`(c("lng_end", "lat_end", "dis"), list(alightDisFull[1, ], 
    alightDisFull[2, ], NA))]
# 将assignDis方法逐行应用于afcBadRemain数据中,得到每一条数据的下车站点位置,通过使用mapply代替for循环,能大幅提高程序运行速度

setkey(afcBad, guid)
setkey(afcBadRemain, guid)
afcBad[, `:=`(c("lng_end", "lat_end"), afcBadRemain[.(.SD), .(lng_end, lat_end)])]
# 将afcBadRemain数据中的下车站点对应写入到afcBad数据中

rm(afcBadRemain, alightDis, alightDisFull)
# 删除中间数据以节省内存

Step 14 输出Task2结果

setcolorder(afcBad, names(afcGood))
afcOut <- rbind(afcGood, afcBad)
afcOut <- data.frame(guid = afcOut$guid, lng = afcOut$lng_end, lat = afcOut$lat_end)
write.table(afcOut, "RESULT_ALIGHT_LIST.csv", sep = ",", row.names = FALSE, 
    col.names = FALSE)
save.image(".RData")