啟用 SparkR
# 設定SparkR 環境變數
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/usr/local/spark")
}
# 載入SparkR
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
##
## Attaching package: 'SparkR'
## The following objects are masked from 'package:stats':
##
## cov, filter, lag, na.omit, predict, sd, var, window
## The following objects are masked from 'package:base':
##
## as.data.frame, colnames, colnames<-, drop, endsWith,
## intersect, rank, rbind, sample, startsWith, subset, summary,
## transform, union
# 本地端啟用
sparkR.session(master = "local[*]",
sparkConfig = list(spark.executor.memory = "600m",
spark.sql.shuffle.partitions = "3",
spark.default.parallelism="2"))
## Spark package found in SPARK_HOME: /usr/local/spark
## Launching java with spark-submit command /usr/local/spark/bin/spark-submit sparkr-shell /tmp/RtmpIZpvW2/backend_port635765311b39
## Java ref type org.apache.spark.sql.SparkSession id 1
# 使用 Cluster
# 將 master 設為 spark://master:7077
載入資料
download.file('https://github.com/ywchiu/sparkr/raw/master/data/diabetic_sample.csv', 'diabetic_sample.csv')
diabetes = read.csv('diabetic_sample.csv')
建立Data Frame
dim(diabetes)
## [1] 500 51
diabetes_sparkdf <- as.DataFrame(diabetes)
## Warning in FUN(X[[i]], ...): Use glyburide_metformin instead of
## glyburide.metformin as column name
## Warning in FUN(X[[i]], ...): Use glipizide_metformin instead of
## glipizide.metformin as column name
## Warning in FUN(X[[i]], ...): Use glimepiride_pioglitazone instead of
## glimepiride.pioglitazone as column name
## Warning in FUN(X[[i]], ...): Use metformin_rosiglitazone instead of
## metformin.rosiglitazone as column name
## Warning in FUN(X[[i]], ...): Use metformin_pioglitazone instead of
## metformin.pioglitazone as column name
# 檢視資料
printSchema(diabetes_sparkdf)
## root
## |-- X: integer (nullable = true)
## |-- encounter_id: integer (nullable = true)
## |-- patient_nbr: integer (nullable = true)
## |-- race: string (nullable = true)
## |-- gender: string (nullable = true)
## |-- age: string (nullable = true)
## |-- weight: string (nullable = true)
## |-- admission_type_id: integer (nullable = true)
## |-- discharge_disposition_id: integer (nullable = true)
## |-- admission_source_id: integer (nullable = true)
## |-- time_in_hospital: integer (nullable = true)
## |-- payer_code: string (nullable = true)
## |-- medical_specialty: string (nullable = true)
## |-- num_lab_procedures: integer (nullable = true)
## |-- num_procedures: integer (nullable = true)
## |-- num_medications: integer (nullable = true)
## |-- number_outpatient: integer (nullable = true)
## |-- number_emergency: integer (nullable = true)
## |-- number_inpatient: integer (nullable = true)
## |-- diag_1: string (nullable = true)
## |-- diag_2: string (nullable = true)
## |-- diag_3: string (nullable = true)
## |-- number_diagnoses: integer (nullable = true)
## |-- max_glu_serum: string (nullable = true)
## |-- A1Cresult: string (nullable = true)
## |-- metformin: string (nullable = true)
## |-- repaglinide: string (nullable = true)
## |-- nateglinide: string (nullable = true)
## |-- chlorpropamide: string (nullable = true)
## |-- glimepiride: string (nullable = true)
## |-- acetohexamide: string (nullable = true)
## |-- glipizide: string (nullable = true)
## |-- glyburide: string (nullable = true)
## |-- tolbutamide: string (nullable = true)
## |-- pioglitazone: string (nullable = true)
## |-- rosiglitazone: string (nullable = true)
## |-- acarbose: string (nullable = true)
## |-- miglitol: string (nullable = true)
## |-- troglitazone: string (nullable = true)
## |-- tolazamide: string (nullable = true)
## |-- examide: string (nullable = true)
## |-- citoglipton: string (nullable = true)
## |-- insulin: string (nullable = true)
## |-- glyburide_metformin: string (nullable = true)
## |-- glipizide_metformin: string (nullable = true)
## |-- glimepiride_pioglitazone: string (nullable = true)
## |-- metformin_rosiglitazone: string (nullable = true)
## |-- metformin_pioglitazone: string (nullable = true)
## |-- change: string (nullable = true)
## |-- diabetesMed: string (nullable = true)
## |-- readmitted: integer (nullable = true)
columns(diabetes_sparkdf)
## [1] "X" "encounter_id"
## [3] "patient_nbr" "race"
## [5] "gender" "age"
## [7] "weight" "admission_type_id"
## [9] "discharge_disposition_id" "admission_source_id"
## [11] "time_in_hospital" "payer_code"
## [13] "medical_specialty" "num_lab_procedures"
## [15] "num_procedures" "num_medications"
## [17] "number_outpatient" "number_emergency"
## [19] "number_inpatient" "diag_1"
## [21] "diag_2" "diag_3"
## [23] "number_diagnoses" "max_glu_serum"
## [25] "A1Cresult" "metformin"
## [27] "repaglinide" "nateglinide"
## [29] "chlorpropamide" "glimepiride"
## [31] "acetohexamide" "glipizide"
## [33] "glyburide" "tolbutamide"
## [35] "pioglitazone" "rosiglitazone"
## [37] "acarbose" "miglitol"
## [39] "troglitazone" "tolazamide"
## [41] "examide" "citoglipton"
## [43] "insulin" "glyburide_metformin"
## [45] "glipizide_metformin" "glimepiride_pioglitazone"
## [47] "metformin_rosiglitazone" "metformin_pioglitazone"
## [49] "change" "diabetesMed"
## [51] "readmitted"
SparkR 操作
#select
head(select(diabetes_sparkdf, diabetes_sparkdf$readmitted,
diabetes_sparkdf$gender))
## readmitted gender
## 1 0 Male
## 2 0 Female
## 3 0 Female
## 4 0 Female
## 5 0 Female
## 6 0 Female
#distinct
head(distinct(select(diabetes_sparkdf, diabetes_sparkdf$age)))
## age
## 1 [60-70)
## 2 [30-40)
## 3 [80-90)
## 4 [90-100)
## 5 [20-30)
## 6 [70-80)
#filter
showDF(filter(diabetes_sparkdf, diabetes_sparkdf$insulin == 'Up'
& diabetes_sparkdf$readmitted==1),3)
## +-----+------------+-----------+---------+------+-------+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+
## | X|encounter_id|patient_nbr| race|gender| age|weight|admission_type_id|discharge_disposition_id|admission_source_id|time_in_hospital|payer_code|medical_specialty|num_lab_procedures|num_procedures|num_medications|number_outpatient|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnoses|max_glu_serum|A1Cresult|metformin|repaglinide|nateglinide|chlorpropamide|glimepiride|acetohexamide|glipizide|glyburide|tolbutamide|pioglitazone|rosiglitazone|acarbose|miglitol|troglitazone|tolazamide|examide|citoglipton|insulin|glyburide_metformin|glipizide_metformin|glimepiride_pioglitazone|metformin_rosiglitazone|metformin_pioglitazone|change|diabetesMed|readmitted|
## +-----+------------+-----------+---------+------+-------+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+
## |34253| 107381526| 90683892|Caucasian| Male|[70-80)| null| 2| 22| 7| 14| null| Emergency/Trauma| 66| 3| 30| 0| 0| 0| 997| 682| 785| 9| None| None| No| No| No| No| No| No| No| No| No| No| No| No| No| No| No| No| No| Up| No| No| No| No| No| Ch| Yes| 1|
## |86618| 275606604| 67982049|Caucasian| Male|[80-90)| null| 1| 3| 7| 2| MC| null| 58| 0| 12| 0| 1| 2| 599| 348| 250| 8| None| None| No| No| No| No| No| No| No| No| No| No| No| No| No| No| No| No| No| Up| No| No| No| No| No| Ch| Yes| 1|
## |94557| 349632332| 175353413|Caucasian|Female|[50-60)| null| 1| 5| 7| 3| SP| Emergency/Trauma| 64| 0| 8| 0| 0| 0| 967| 250.8| 980| 9| None| None| Steady| No| No| No| No| No| No| No| No| Steady| No| No| No| No| No| No| No| Up| No| No| No| No| No| Ch| Yes| 1|
## +-----+------------+-----------+---------+------+-------+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+
## only showing top 3 rows
#arrange
head(arrange(diabetes_sparkdf,
desc(diabetes_sparkdf$number_diagnoses)))
## X encounter_id patient_nbr race gender age weight
## 1 101118 434999606 94629978 AfricanAmerican Male [40-50) NA
## 2 48825 148510140 55211661 AfricanAmerican Male [50-60) NA
## 3 101398 438566858 106174269 Caucasian Male [70-80) NA
## 4 72800 213821130 38092113 Other Female [90-100) NA
## 5 76182 229632420 70956018 Caucasian Female [80-90) NA
## 6 62313 173681550 94618368 Caucasian Male [80-90) NA
## admission_type_id discharge_disposition_id admission_source_id
## 1 1 1 7
## 2 1 3 7
## 3 3 1 1
## 4 3 3 1
## 5 2 6 7
## 6 1 22 7
## time_in_hospital payer_code medical_specialty num_lab_procedures
## 1 2 CP <NA> 46
## 2 5 MD <NA> 41
## 3 2 MC <NA> 11
## 4 4 MC <NA> 38
## 5 8 MC Family/GeneralPractice 67
## 6 5 MC <NA> 33
## num_procedures num_medications number_outpatient number_emergency
## 1 0 12 0 0
## 2 3 13 0 0
## 3 3 16 0 0
## 4 1 11 0 1
## 5 0 21 0 0
## 6 4 12 0 0
## number_inpatient diag_1 diag_2 diag_3 number_diagnoses max_glu_serum
## 1 0 780 496 786 10 None
## 2 1 415 403 285 9 None
## 3 3 403 427 414 9 None
## 4 0 787 276 707 9 None
## 5 1 428 427 486 9 None
## 6 0 821 428 285 9 None
## A1Cresult metformin repaglinide nateglinide chlorpropamide glimepiride
## 1 >7 Steady No No No No
## 2 None No No No No No
## 3 None No No No No No
## 4 None No No No No No
## 5 None No No No No No
## 6 None No No No No No
## acetohexamide glipizide glyburide tolbutamide pioglitazone rosiglitazone
## 1 No No No No No No
## 2 No No No No No No
## 3 No No No No No No
## 4 No No No No No No
## 5 No No No No No No
## 6 No No No No No No
## acarbose miglitol troglitazone tolazamide examide citoglipton insulin
## 1 No No No No No No No
## 2 No No No No No No No
## 3 No No No No No No Steady
## 4 No No No No No No No
## 5 No No No No No No Steady
## 6 No No No No No No No
## glyburide_metformin glipizide_metformin glimepiride_pioglitazone
## 1 No No No
## 2 No No No
## 3 No No No
## 4 No No No
## 5 No No No
## 6 No No No
## metformin_rosiglitazone metformin_pioglitazone change diabetesMed
## 1 No No No Yes
## 2 No No No No
## 3 No No No Yes
## 4 No No No No
## 5 No No No Yes
## 6 No No No No
## readmitted
## 1 0
## 2 0
## 3 0
## 4 0
## 5 0
## 6 0
head(arrange(diabetes_sparkdf,
desc(diabetes_sparkdf$time_in_hospital),
diabetes_sparkdf$num_medications))
## X encounter_id patient_nbr race gender age weight
## 1 38304 119026476 24497235 AfricanAmerican Female [80-90) NA
## 2 76731 232016922 43078113 Caucasian Male [50-60) NA
## 3 34253 107381526 90683892 Caucasian Male [70-80) NA
## 4 63278 175962186 23281767 AfricanAmerican Female [60-70) NA
## 5 77213 234058170 68505696 Caucasian Male [50-60) NA
## 6 10753 45218502 18494559 Caucasian Male [60-70) NA
## admission_type_id discharge_disposition_id admission_source_id
## 1 2 3 1
## 2 3 1 1
## 3 2 22 7
## 4 1 3 7
## 5 3 1 1
## 6 1 18 7
## time_in_hospital payer_code medical_specialty num_lab_procedures
## 1 14 MC Nephrology 39
## 2 14 SP <NA> 61
## 3 14 <NA> Emergency/Trauma 66
## 4 14 MD Nephrology 42
## 5 13 CP Psychiatry 37
## 6 13 <NA> <NA> 66
## num_procedures num_medications number_outpatient number_emergency
## 1 4 14 1 0
## 2 0 19 0 0
## 3 3 30 0 0
## 4 6 33 4 0
## 5 0 10 0 0
## 6 0 15 0 0
## number_inpatient diag_1 diag_2 diag_3 number_diagnoses max_glu_serum
## 1 0 996 790 403 6 None
## 2 0 296 585 427 8 None
## 3 0 997 682 785 9 None
## 4 0 730 324 723 9 None
## 5 0 296 298 250 9 None
## 6 2 998 682 428 8 None
## A1Cresult metformin repaglinide nateglinide chlorpropamide glimepiride
## 1 None No No No No No
## 2 >7 No No No No No
## 3 None No No No No No
## 4 None No No No No No
## 5 None No No No No No
## 6 None No No No No No
## acetohexamide glipizide glyburide tolbutamide pioglitazone rosiglitazone
## 1 No No No No No No
## 2 No No No No No No
## 3 No No No No No No
## 4 No No No No No No
## 5 No No No No No No
## 6 No No No No No No
## acarbose miglitol troglitazone tolazamide examide citoglipton insulin
## 1 No No No No No No No
## 2 No No No No No No Down
## 3 No No No No No No Up
## 4 No No No No No No Steady
## 5 No No No No No No No
## 6 No No No No No No No
## glyburide_metformin glipizide_metformin glimepiride_pioglitazone
## 1 No No No
## 2 No No No
## 3 No No No
## 4 No No No
## 5 No No No
## 6 No No No
## metformin_rosiglitazone metformin_pioglitazone change diabetesMed
## 1 No No No No
## 2 No No Ch Yes
## 3 No No Ch Yes
## 4 No No No Yes
## 5 No No No No
## 6 No No No No
## readmitted
## 1 0
## 2 0
## 3 1
## 4 0
## 5 0
## 6 0
#mutate
number_visits <- select(diabetes_sparkdf,
diabetes_sparkdf$medical_specialty,
diabetes_sparkdf$gender,
diabetes_sparkdf$number_outpatient,
diabetes_sparkdf$number_emergency,
diabetes_sparkdf$number_inpatient)
head(mutate(number_visits, total_visits=(number_visits$number_outpatient + number_visits$number_emergency)))
## medical_specialty gender number_outpatient number_emergency
## 1 Emergency/Trauma Male 0 0
## 2 Family/GeneralPractice Female 0 0
## 3 <NA> Female 0 0
## 4 Gastroenterology Female 0 1
## 5 Surgery-General Female 0 0
## 6 Emergency/Trauma Female 0 0
## number_inpatient total_visits
## 1 1 0
## 2 1 0
## 3 0 0
## 4 0 1
## 5 0 0
## 6 0 0
## summarize (or agg) / groupBy (or group_by)
head(summarize(groupBy(diabetes_sparkdf, diabetes_sparkdf$insulin
== 'Up' & diabetes_sparkdf$readmitted==1),
count = n(diabetes_sparkdf$insulin == 'Up' & diabetes_sparkdf$readmitted==1)))
## ((insulin = Up) AND (readmitted = 1.0)) count
## 1 FALSE 494
## 2 TRUE 6
head(summarize(groupBy(diabetes_sparkdf, diabetes_sparkdf$insulin
== 'Up' & diabetes_sparkdf$readmitted==1),
count = sum(diabetes_sparkdf$readmitted)))
## ((insulin = Up) AND (readmitted = 1.0)) count
## 1 FALSE 51
## 2 TRUE 6
head(arrange(agg(groupBy(diabetes_sparkdf, diabetes_sparkdf$age),
total_diagnoses = sum(diabetes_sparkdf$number_diagnoses)),
diabetes_sparkdf$age))
## age total_diagnoses
## 1 [10-20) 11
## 2 [20-30) 66
## 3 [30-40) 94
## 4 [40-50) 349
## 5 [50-60) 638
## 6 [60-70) 894
使用 magrittr
library(magrittr)
groupBy(diabetes_sparkdf, diabetes_sparkdf$age) %>%
summarize(total_diagnoses =
sum(diabetes_sparkdf$number_diagnoses)) %>%
arrange(diabetes_sparkdf$age) %>%
head()