diff --git a/assign2.R b/assign2.R
new file mode 100644
index 0000000000000000000000000000000000000000..66530665b367d5d39e731e29089c57506b06391a
--- /dev/null
+++ b/assign2.R
@@ -0,0 +1,262 @@
+rm(list=ls())
+require(lubridate)
+suppressWarnings(require(data.table))
+require(stringr)
+require(geohash)
+library(raster)
+require(dplyr)
+library(sp)
+library(broom)
+library(httr)
+library(rgdal)
+require(sparklyr)
+require(rJava)
+require(readr)
+
+
+first = TRUE
+n = 0
+for(i in 1:12){
+  if(first == TRUE){
+    taxi_jul <- fread("~/yellow_tripdata_2015-07.csv", 
+                      nrows=1000000)
+    taxi_jul <- taxi_jul %>% filter(weekdays(ymd_hms(tpep_pickup_datetime, tz = "America/New_York")) == "Wednesday")
+    n1 = nrow(taxi_jul)
+    
+    first <- FALSE
+  }else{
+    taxi_bind1 <- fread("~/yellow_tripdata_2015-07.csv",
+                        nrows=1000000, skip=n1+1)    
+    names(taxi_bind1) <- names(taxi_jul)
+    
+    taxi_bind1 <- taxi_bind1 %>% filter(weekdays(ymd_hms(tpep_pickup_datetime, tz = "America/New_York")) == "Wednesday")
+    
+    taxi_jul <- bind_rows(taxi_jul, taxi_bind1)
+    
+    n1 = nrow(taxi_jul)
+    
+  }
+}
+
+taxi <- taxi_jul
+rm(taxi_jul)
+
+hm_pickup <- (lubridate::hour(taxi$tpep_pickup_datetime)*60)+as.numeric(lubridate::minute(taxi$tpep_pickup_datetime))
+
+interval_5min <- vector()
+for(j in seq_len(nrow(taxi))){
+  for(i in seq(0, 2350, by = 5)){
+    if(between(hm_pickup[j], i, i+5)){
+      interval_5min[j] = paste0("(", i, " - ", i+5, "]")      
+    } 
+  }
+}
+
+taxi$interval_5min <- interval_5min
+
+#convert dropoff times to ordered factor with monday as 1, sunday as 7
+dropoff_weekday_num <- weekdays(ymd_hms(taxi$tpep_dropoff_datetime, tz = "America/New_York"))
+
+dropoff_weekday_num <- factor(dropoff_weekday_num,
+                              levels = c("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"),
+                              ordered = TRUE)
+pickup_weekday_num <- weekdays(ymd_hms(taxi$tpep_pickup_datetime, tz = "America/New_York"))
+
+pickup_weekday_num <- factor(pickup_weekday_num,
+                             levels = c("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"),
+                             ordered = TRUE)
+
+
+#convert time to seconds and then make cosine and sine versions for smooth transitions between time intervals
+taxi <- taxi %>% dplyr::mutate(time_hour = lubridate::hour(taxi$tpep_pickup_datetime), 
+                               time_num = (((lubridate::hour(taxi$tpep_pickup_datetime)*60*60)+
+                                              (lubridate::minute(taxi$tpep_pickup_datetime)*60)+
+                                              lubridate::second(taxi$tpep_pickup_datetime))/(60*60*24)),
+                               time_num_cosine = cos(time_num*2*pi),
+                               time_num_sine = sin(time_num*2*pi))
+
+#add a feature for weekday as a numeric value - 1 for monday, 7 for sunday
+taxi <- taxi %>% dplyr::mutate(weekday = pickup_weekday_num,
+                               weekday_num = (as.numeric(weekday)+time_num)/7,
+                               weekday_num_cosine = cos(weekday_num*2*pi),
+                               weekday_num_sine = sin(weekday_num*2*pi))
+
+#time difference between pickup and dropoff in seconds
+taxi <- taxi %>% dplyr::mutate(time_diff_secs = 
+                                 ((lubridate::hour(taxi$tpep_dropoff_datetime)*60*60)+
+                                    (lubridate::minute(taxi$tpep_dropoff_datetime)*60)+
+                                    lubridate::second(taxi$tpep_dropoff_datetime)) - 
+                                 ((lubridate::hour(taxi$tpep_pickup_datetime)*60*60)+
+                                    (lubridate::minute(taxi$tpep_pickup_datetime)*60)+
+                                    lubridate::second(taxi$tpep_pickup_datetime)))
+
+#create week as numeric value
+taxi <- taxi %>% dplyr::mutate(weekofyear = lubridate::week(tpep_pickup_datetime),
+                               weekofyear_num = (weekofyear+weekday_num)/53,
+                               weekofyear_num_cosine = cos(weekofyear_num*2*pi),
+                               weekofyear_num_sine = sin(weekofyear_num*2*pi))
+
+
+#remove impossible and unlikely values
+taxi <- taxi %>% dplyr::filter(time_diff_secs > 0 & time_diff_secs/60 < 300) 
+#convert time difference feature to be between 0 and 1 (scaled by max)
+taxi <- taxi %>% dplyr::mutate(timediffsec_num = time_diff_secs/max(time_diff_secs))
+
+
+#miles to kms
+taxi$trip_distance <- taxi$trip_distance*1.609344
+
+#outlier for trip distance, just remove as don't know how else to deal with it
+taxi <- taxi %>% dplyr::filter(trip_distance > 0 & trip_distance <  150)
+#remove longitude and latitude equal to 0
+taxi <- taxi %>% dplyr::filter(pickup_longitude != 0 | pickup_latitude != 0)
+
+
+taxi <-  taxi %>% dplyr::filter(trip_distance > 0)
+taxi <-  taxi %>% dplyr::filter(total_amount < 200)
+
+
+#data features inspired by https://sdaulton.github.io/TaxiPrediction/
+#specifically geohashing, find out this is best in analysis below comparing to boroughs and neighbourhoods
+#Also numeric values between 0 and 1 for time of day, day of week, week of year
+#And extra features converting these scaled numeric values functions of sine and cosine
+#to make for a more smooth transition between time periods 
+
+taxi$pickup_geohash <-gh_encode(taxi$pickup_latitude, taxi$pickup_longitude, 5)
+
+#scale trip distance to be between 0 and 1
+taxi$trip_distance_num <- taxi$trip_distance/max(taxi$trip_distance)
+taxi$RatecodeID <- as.factor(taxi$RatecodeID)
+taxi$passenger_count <- as.factor(taxi$passenger_count)
+
+#scale total amount to be between 0 and 1
+taxi$total_amount_num <- taxi$total_amount/max(taxi$total_amount)
+
+
+#59 unique geohashes for pickups
+taxi %>% count(pickup_geohash)
+
+r <- GET('http://data.beta.nyc//dataset/0ff93d2d-90ba-457c-9f7e-39e47bf2ac5f/resource/35dd04fb-81b3-479b-a074-a27a37888ce7/download/d085e2f8d0b54d4590b1e7d1f35594c1pediacitiesnycneighborhoods.geojson')
+
+nyc_neighborhoods <- readOGR(content(r,'text'), 'OGRGeoJSON', verbose = F)
+
+taxi <- as.data.frame(taxi)
+taxi_spdf <- SpatialPointsDataFrame(taxi[,c('pickup_longitude', 'pickup_latitude')], 
+                                    proj4string = CRS("+proj=longlat +datum=WGS84 +no_defs +ellps=WGS84 +towgs84=0,0,0"),
+                                    data=taxi)
+taxi_spdf <- over(taxi_spdf, nyc_neighborhoods[,c('neighborhood', 'borough')])
+taxi$neighbourhood <- taxi_spdf$neighborhood
+taxi$borough <- taxi_spdf$borough
+
+glimpse(taxi)
+taxi %>% group_by(neighbourhood, interval_5min) %>% count() %>% arrange(desc(n)) %>% 
+  group_by(neighbourhood) %>% summarise(n = sum(n)) %>% arrange(desc(n))
+
+#incredible that there is upward of 500 pickups in 5 minute intervals on a wednesday in Midtown in NYC
+taxi %>% filter(neighbourhood %in% c("Midtown", "Upper East Side", "Chelsea", "Upper West Side",
+                                     "Hell's Kitchen")) %>% count(interval_5min) %>% 
+  ggplot() + geom_col(aes(x=interval_5min, y=n)) +
+  theme(axis.text.x = element_text(angle = 55, hjust = 1)) + ggtitle("Pickup density for Midtown on Wednesday's in July of 2015")
+
+tt <- taxi %>% filter(neighbourhood %in% c("Midtown", "Upper East Side")) %>% group_by(neighbourhood, interval_5min) %>% 
+  count() %>% ungroup(taxi)
+taxi2 <- taxi %>% filter(neighbourhood %in% c("Midtown", "Upper East Side"))
+taxi3 <- taxi %>% filter(neighbourhood %in% c("Midtown", "Upper East Side"))
+dim(taxi2)
+sort(tt$n, decreasing = T)
+pickup_n <- vector()
+tt$neighbourhood <-  as.character(tt$neighbourhood)
+tt_mid <- tt %>% filter(neighbourhood == "Midtown")
+names(tt_mid)[3] <- "n1"
+tt_ues <- tt %>% filter(neighbourhood == "Upper East Side")
+names(tt_ues)[2:3] <- c("neighbourhood2", "n2")
+tt_ues <- tt_ues %>% dplyr::select(neighbourhood2, n2)
+tt <- cbind(tt_mid, tt_ues)
+head(tt)
+for(i in 1:nrow(taxi2)){
+  for(j in 1:length(unique(tt$interval_5min))){
+    if(taxi2$interval_5min[i] == tt$interval_5min[j]){
+      if(taxi2$neighbourhood[i] == tt$neighbourhood[j]){
+        pickup_n[i] = tt$n1[j]
+      }else{
+        pickup_n[i] = tt$n2[j]
+      }        
+    }
+  }
+}
+taxi2$pickup_n <- pickup_n
+glimpse(taxi2)
+taxi2 %>% filter(neighbourhood == "Midtown") %>% count(interval_5min) %>% 
+  ggplot() + geom_col(aes(x=reorder(interval_5min, n), y=n)) +
+  theme(axis.text.x=element_blank(), axis.ticks.x=element_blank()) + xlab("Time")+ggtitle("Midtown")
+taxi2 %>% filter(neighbourhood == "Upper East Side") %>% count(interval_5min) %>% 
+  ggplot() + geom_col(aes(x=reorder(interval_5min, n), y=n)) + theme(axis.text.x=element_blank(),
+        axis.ticks.x=element_blank()) +ggtitle("Upper East Side") + xlab("Time")
+
+taxi2 %>% filter(neighbourhood == "Midtown") %>% count(interval_5min) %>% mutate(sd = sd(n))
+taxi2 %>% filter(neighbourhood == "Upper East Side") %>% count(interval_5min) %>% mutate(sd = sd(n))
+
+glimpse(taxi2)
+taxi2$VendorID <- as.factor(taxi2$VendorID) 
+taxi2$payment_type <- as.factor(taxi2$payment_type) 
+
+taxi2 <- taxi2 %>% dplyr::select(VendorID, passenger_count, RatecodeID, payment_type, time_num, time_num_cosine, time_num_sine,
+                                 weekday_num, weekday_num_cosine, weekday_num_sine, timediffsec_num, trip_distance_num,
+                                 total_amount_num, neighbourhood, interval_5min, pickup_n)
+
+taxi2 %>% filter(neighbourhood == "Midtown") %>% group_by(interval_5min) %>%
+  summarise(tan = median(total_amount_num), tdn = median(trip_distance_num), tdsn = median(timediffsec_num),
+            woyns = mean(weekofyear_num_sine), woync = mean(weekofyear_num_cosine), woyn = median(weekofyear_num),
+            tns = mean(time_num_sine), tnc = mean(time_num_cosine), tn = median(time_num))
+
+Y <- taxi %>% filter(neighbourhood == "Midtown") %>% count(pickup_halfhr_cat) %>% dplyr::select(n)
+taxi <- taxi %>% dplyr::mutate(Y = Y$n)  
+
+taxi2 %>% filter(neighbourhood == "Midtown") %>% 
+  ggplot() + geom_col(aes(x=interval_5min, y=pickup_n))
+write.csv(taxi2, "D:/R/TaxiAssignment/taxi_ml_format.csv")
+
+
+#######SPARK
+cluster_url <- paste0("spark://", system("hostname -i", intern = TRUE), ":7077")
+library(sparklyr)
+library(readr)
+library(dplyr)
+sc <- spark_connect(master = cluster_url)
+spark_read_csv(sc, "taxi_ml_format.csv", path = "")
+taxi_ml_format <- read_csv("taxi_ml_format.csv")
+taxi_ml_format <- taxi_ml_format %>% select(-X1)
+X_data <- taxi_ml_format 
+
+
+X_data$VendorID <- as.factor(X_data$VendorID)
+X_data$RatecodeID <- as.factor(X_data$RatecodeID)
+X_data$payment_type <- as.factor(X_data$payment_type)
+X_data$passenger_count <- as.factor(X_data$passenger_count)
+
+X_tbl <- copy_to(sc, X_data, "x_data", overwrite=T)
+
+partitions <- X_tbl %>%
+  sdf_partition(training = 0.75, test = 0.25, seed = 1099)
+
+taxi_training <- partitions$training
+taxi_test <- partitions$test
+
+#baseline
+lm <- ml_linear_regression(taxi_training, pickup_n~.)
+
+rf_model <- taxi_training %>%
+  ml_random_forest(pickup_n ~ ., type = "regression")
+
+pred_lm <- sdf_predict(taxi_test, lm)
+pred_rf <- sdf_predict(taxi_test, rf_model)
+
+ml_regression_evaluator(pred_lm, label_col = "pickup_n", prediction_col = "prediction", metric_name="r2")
+ml_regression_evaluator(pred_rf, label_col = "pickup_n", prediction_col = "prediction", metric_name="r2")
+ml_regression_evaluator(pred_lm, label_col = "pickup_n", prediction_col = "prediction", metric_name="rmse")
+ml_regression_evaluator(pred_rf, label_col = "pickup_n", prediction_col = "prediction", metric_name="rmse")
+
+imp <- sparklyr::ml_feature_importances(rf_model)
+
+
+spark_disconnect(sc)
\ No newline at end of file