$Fangzheng$ $Lyu^{1,2}$, $Zewei$ $Xu^{1,2}$, $Xinlin$ $Ma^{3}$, $Shaohua$ $Wang^{1,2}$, $Zhiyu$ $Li^{1,2}$, $Shaowen$ $Wang^{1,2}$
$^{1}$$Department$ $of$ $Geography$ $and$ $Geographic$ $Information$ $Science$, $University$ $of$ $Illinois$ $at$ $Urbana-Champaign$, $Urbana$, $IL$, $USA$
$^{2}$$CyberGIS$ $Center$ $for$ $Advanced$ $Digital$ $and$ $Spatial$ $Studies$, $University$ $of$ $Illinois$ $at$ $Urbana-Champaign$, $Urbana$, $IL$, $USA$
$^{3}$$Department$ $of$ $Management$ $and$ $Urban$ $Planning$, $Tsinghua$ $University$, $Beijing$, $China$
$Corresponding$ $Author:$ $shaowen@illinois.edu$
Drainage network analysis is fundamental to understanding the characteristics of surface hydrology. Based on elevation data, drainage network analysis is often used to extract key hydrological features like drainage networks and streamlines. Limited by raster-based data models, conventional drainage network algorithms typically allow water to flow in 4 or 8 directions (surrounding grids) from a raster grid. To resolve this limitation, this paper describes a new vector-based method for drainage network analysis that allows water to flow in any direction around each location. The method is enabled by rapid advances in Light Detection and Ranging (LiDAR) remote sensing and high-performance computing. The drainage network analysis is conducted using a high-density point cloud instead of Digital Elevation Models (DEMs) at coarse resolutions. Our computational experiments show that the vector-based method can better capture water flows without limiting the number of directions due to imprecise DEMs. Our case study applies the method to Rowan County watershed, North Carolina in the US. After comparing the drainage networks and streamlines detected with corresponding reference data from US Geological Survey generated from the Geonet software, we find that the new method performs well in capturing the characteristics of water flows on landscape surfaces in order to form an accurate drainage network.
This notebook is a sample notebook for running a small size dataset (from the LiDAR dataset of Rowan Watershed) with our new method for drainage system analysis. The algorithm is implemented with an execuation sample dataset.
Paper:
Fangzheng Lyu, Zewei Xu, Xinlin Ma, Shaohua Wang, Zhiyu Li, Shaowen Wang, A vector-based method for drainage network analysis based on LiDAR data, Computers & Geosciences, Volume 156, 2021, 104892, ISSN 0098-3004
https://www.sciencedirect.com/science/article/pii/S0098300421001849
Github repo:
https://github.com/cybergis/Drainage-System-Analysis-Research
The Library used for in the algorithm is set up here
%load_ext pycodestyle_magic
#%pycodestyle_on
%flake8_on --ignore E501,E251,E231,E225
Import library
import os
import datetime
import laspy
import numpy as np
import pandas as pd
from numpy.linalg import inv
import math
import sys
# var1 is the starting location in the x-axis
var1 = 5
# var2 is the starting location in the y-axis
var2 = 5
# var3 is the angle different we want
var3 = 30
import os
from shutil import copyfile
import gdown
# download sample data if it is not local
local_sample_path = "./3_2.las"
if not os.path.isfile(local_sample_path):
print("Missing sample data!")
sample_shared = "/home/jovyan/shared_data/data/drainage_system_analysis_research/3_2.las"
if os.path.join(sample_shared):
print("Copying sample data from shared folder...")
copyfile(sample_shared, local_sample_path)
else:
print("Copying sample data from shared google drive...")
url_gdrive = 'https://drive.google.com/uc?id=1JOl1IylIZg72QdMM89xs10NLFk4rObml'
gdown.download(url_gdrive, local_sample_path, quiet=False)
else:
print("Sample data already exists")
if not os.path.isfile(local_sample_path):
print("Can not retrieve sample data!")
!ls {local_sample_path} -alh
# Store the LiDAR data as infile
infile = laspy.file.File(local_sample_path, mode="r")
# Get the value for x axis, y axis, and elevation for LiDAR point cloud
ground_x = infile.x
ground_y = infile.y
ground_z = infile.z
# Normalize the output for the LiDAR file
ground_x_2 = ground_x-ground_x.min()
ground_y_2 = ground_y-ground_y.min()
The total amount of LiDAR point in the LiDAR file
len(ground_z)
The x, y, z value for points in the LiDAR file are stored
x = ground_x_2
y = ground_y_2
z = ground_z
Use a dictionary to store the datset
threedarray = np.vstack((x,y,z)).T
dictionary = pd.Series(threedarray.tolist(), index=map(lambda a: round(a,2), x.tolist()))
An example of the format of the dataset
dictionary[1.11]
Create the index for each LiDAR data point
list_a = map(lambda a: 10000*int(a), x.tolist())
list_b = map(lambda a: int(a), y.tolist())
index_list = [sum(x) for x in zip(list_a, list_b)]
Generate the hash table and use dictionary data structure to store the dataset
grid_dictionary = pd.Series(threedarray.tolist(), index=index_list)
Get a overview of the dataset
grid_dictionary
This is the function used in the algorithm to find the elevation of any given point (x, y) using bilinear interpolation.
def find_elevation_new(x, y):
# bilinear interpolation
storage = []
# curr_index = 10000*int(x)+int(y)
diff = 0
while(len(storage)<4):
# Find all the data within the nearby grid
temp = []
# For loop here to find candidate points for bilinear interpolation
for i in range(int(x-diff), int(x+diff+1)):
for j in range(int(y-diff), int(y+diff+1)):
if (i==int(x-diff) or i==int(x+diff) or j==int(y-diff) or j==int(y+diff)):
try:
rt = grid_dictionary[10000*i+j]
if (type(rt)==list):
temp.append(rt)
else:
for it in range(0,len(rt)):
temp.append(rt[it])
except Exception:
pass
temp.sort(key = lambda e: (e[0]-x)*(e[0]-x)+(e[1]-y)*(e[1]-y))
if (len(storage)+len(temp) <= 4):
# add them
for i in range(0,len(temp)):
storage.append(temp[i])
else:
k = 0
while(len(storage) != 4):
storage.append(temp[k])
k = k+1
diff = diff+1
# find the points used for data interpolation
new_storage = storage[:4]
a = [[1,new_storage[0][0], new_storage[0][1], new_storage[0][0]*new_storage[0][1]],
[1, new_storage[1][0], new_storage[1][1], new_storage[1][0]*new_storage[1][1]],
[1, new_storage[2][0], new_storage[2][1], new_storage[2][0]*new_storage[2][1]],
[1, new_storage[3][0], new_storage[3][1], new_storage[3][0]*new_storage[3][1]]]
b = [new_storage[0][2], new_storage[1][2], new_storage[2][2], new_storage[3][2]]
try:
# Conduct bilinear interpolation
coef_matrix = np.matmul(inv(a), b)
rt = coef_matrix[0]+coef_matrix[1]*x+coef_matrix[2]*y+coef_matrix[3]*x*y
return rt
except Exception:
return 10000
This is the major step in the algorithm that is used for simulation of the water flow.
This cell may run up to ~15 mins.</style>
# The area_length variable here represents how large the area the users want to calculate.
area_length = 10
# Print the current time before the execution of the algorithm
print(datetime.datetime.now())
min_x = var1*100
min_y = var2*100
max_x = var1*100+area_length
max_y = var2*100+area_length
increment = 1
angle = var3
x_coord = min_x
y_coord = min_y
rt = []
while x_coord!=(max_x+1):
while y_coord!=(max_y+1):
# find the elevation of the current coordinate
curr_elevation = find_elevation_new(x_coord, y_coord)
curr_x = x_coord
curr_y = y_coord
# print("Starting Point:")
print((curr_x,curr_y))
curr_array = []
while ((curr_x>=min_x and curr_x<=max_x) and (curr_y>=min_y and curr_y<=max_y)):
# print((curr_x, curr_y, curr_elevation))
curr_array.append((curr_x, curr_y))
rt_x = curr_x
rt_y = curr_y
rt_elevation = curr_elevation
angel_diff = 0
# find the elevation of all candidate points
while(angel_diff<360):
new_x = curr_x+increment*math.sin(math.pi/180*angel_diff)
new_y = curr_y+increment*math.cos(math.pi/180*angel_diff)
new_elevation = find_elevation_new(new_x, new_y)
# print((angel_diff,new_x,new_y,new_elevation,rt_x,rt_y,rt_elevation))
if (new_elevation<rt_elevation):
rt_x = new_x
rt_y = new_y
rt_elevation = new_elevation
angel_diff = angel_diff+angle
# Check if the data is a pit or not
if (rt_elevation<curr_elevation):
curr_x = rt_x
curr_y = rt_y
curr_elevation = rt_elevation
else:
break
rt.append(curr_array)
# print("Result For:")
# print((x_coord,y_coord))
# print(curr_array)
y_coord=y_coord+1
y_coord = min_y
x_coord=x_coord+1
# Print the end time of the execution
print(datetime.datetime.now())
Print the simulated water flow
print(rt)
data = []
# Print the simulated waterflow
for i in range(0,area_length+1):
data.append([0]*(area_length+1))
for i in range(0,len(rt)):
for j in range(0,len(rt[i])):
data[int(math.floor(rt[i][j][0]))-var1*100][int(math.floor(rt[i][j][1]))-var2*100]=data[int(math.floor(rt[i][j][0]))-var1*100][int(math.floor(rt[i][j][1]))-var2*100]+1
The result after tracking the water direction
print(data)
Simple visulization of the result, with brighter area having more water flowing through
%matplotlib inline
import matplotlib.pyplot as plt
plt.imshow(data, interpolation='none')
plt.show()