1. 首页
  2. > 海外公司注册 >

各国及地区gsn网络代码表(asn自治系统编号是什么)


在处理全局位置元数据时,计算坐标点之间的地理距离通常很有用。困难在于在计算距离时必须考虑到地球的曲率。


数据集

在本例中,我们将使用全球历史气候网络(GHCN)每日观测数据集 。它是历史天气数据的数据集,涵盖了180个不同地区的100,000多个不同气象站的263年观测资料。有26.2亿观测值,构成了13.4GB的压缩数据。你可以从这里得到这个(https://www.ncdc.noaa.gov/ghcn-daily-description)。


提供的脚本用于Spark会话,具有8个内核,4个执行程序,4Gb主内存和4Gb工作内存。


用Haversine计算地理距离

Haversine是一个公式,它采用两个坐标点(纬度和经度)并在一个对象上生成第三个坐标点,以计算两个原始点之间的表面距离,同时考虑对象的曲率。它假定物体的形状(在我们的例子中是地球)是一个球体。我们知道地球实际上并不是一个完美的球体,但半径是一个常数可以给出相当准确的结果。


在本例中,我们将生成包含新西兰所有电台的haversine距离的元数据。我们将选择新西兰。


首先,我们需要导入必须的python库。


from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql import functions as F from pyspark.sql import DataFrameWriter as W from math import radians, cos, sin, asin, sqrt spark = (SparkSession.builder .appName(‘’hdfs_Haversine_Fun”) .getOrCreate())

接下来,我们需要加载所需的数据。对于此示例,您只需要站点元数据。但是稍后当您想要进行自己的分析时,您可能希望使用所有GHCN文件。Python代码如下:


input_stations = (spark.read.format(“text”) .load(“hdfs:///data/ghcnd/stations”)) stations_df = input_stations.select( F.trim(F.substring(F.col(“value”), 1 , 11–1 1 )).alias(“STATION_ID”).cast(StringType()), F.trim(F.substring(F.col(“value”), 13, 20–13 1)).alias(“LATITUDE”).cast(DoubleType()), F.trim(F.substring(F.col(“value”), 22, 30–22 1)).alias(“LONGITUDE”).cast(DoubleType()), F.trim(F.substring(F.col(“value”), 32, 37–32 1)).alias(“ELEVATION”).cast(DoubleType()), F.trim(F.substring(F.col(“value”), 39, 40–39 1)).alias(“STATE_CODE”).cast(StringType()), F.trim(F.substring(F.col(“value”), 42, 71–42 1)).alias(“STATION_NAME”).cast(StringType()), F.trim(F.substring(F.col(“value”), 73, 75–73 1)).alias(“GSNFLAG”).cast(StringType()), F.trim(F.substring(F.col(“value”), 77, 79–77 1)).alias(“HCNFLAG_CRNFLAG”).cast(StringType()), F.trim(F.substring(F.col(“value”), 81, 85–81 1)).alias(“WMOID”).cast(StringType()) )

请注意,您不需要定义Schema,然后将其传递到单独的load语句中,因为您可以使用pyspark.sql.functions将数据解析为具有所需类型的新列。实际上,这比在文本文件中首先定义StructFields要快得多。


让我们来看看stations_df的前五个观察结果。


stations_df.show(5)


接下来,我们将添加一个名为COUNTRY_CODE的列,以便我们稍后可以过滤以仅访问我们最喜欢的国家/地区中的电台。我们应该总是将udf传递给实现结果所需的最少量信息,因为udfs在大型数据集上的计算成本往往很高。


我们可以看到STATION_ID的前两个字符是国家代码。所以我们只需要取两个前两个字符并将其添加为新列COUNTRY_CODE。


stations_df = stations_df.withColumn(‘COUNTRY_CODE’, stations_df.STATION_ID.substr(1, 2))

完成后,我们可以过滤并选择所需的列。


nz_stations = (stations_df .filter(stations_df.COUNTRY_CODE==”NZ”) .select(“STATION_ID”,“STATION_NAME”,“LATITUDE”, “LONGITUDE”))

现在我们可以开始Haversine函数了!为此,我们创建了一个标准的python函数,其中我们使用地球的半径为6371km,并返回distance rounded的绝对值为2dp。Python代码如下:


def get_distance(longit_a, latit_b, longit_b, latit_b): # Transform to radians longit_a, latit_b, longit_b, latit_b = map(radians, [longit_a, latit_b, longit_b, latit_b]) dist_longit = longit_b — longit_a dist_latit = latit_b — latit_a # Calculate area area = sin(dist_latit/2)**2 cos(latit_a) * sin(dist_longit/2)**2 # Calculate the central angle central_angle = 2 * asin(sqrt(area)) radius = 6371 # Calculate Distance distance = central_angle * radius return abs(round(distance, 2))

现在我们已经将它定义为python函数,我们可以创建一个用户定义的函数来在Spark DataFrame上使用它。用户定义函数允许我们在python或SQL中创建自定义函数,然后使用它们来操作Spark DataFrame中的列。


转换为UDF:


udf_get_distance = F.udf(get_distance)

现在,我们需要获取原始的nz_stations元数据,并将其与自身交叉连接,以支持列操作,并在过程中重命名列。


nz_station_pairs = (nz_stations.crossJoin(nz_stations).toDF( “STATION_ID_A”, “STATION_NAME_A”, “LATITUDE_A”, “LONGITUDE_A”, “STATION_ID_B”, “STATION_NAME_A”, “LATITUDE_B”, “LONGITUDE_B”))

然后通过删除重复的行来清理它。


nz_station_pairs = (nz_station_pairs.filter( nz_station_pairs.STATION_ID_A != nz_station_pairs.STATION_ID_B))

现在我们可以将我们的udf_get_distance函数应用于nz_station_pairs以添加新列ABS_DISTANCE。


nz_pairs_distance = (nz_station_pairs.withColumn(“ABS_DISTANCE”, udf_get_distance( nz_station_pairs.LONGITUDE_A, nz_station_pairs.LATITUDE_A, nz_station_pairs.LONGITUDE_B, nz_station_pairs.LATITUDE_B) ))

注意:在这种情况下,udf不会将输出作为float返回,而是创建一个新的列,distances 为字符串。我想这是因为withColumn恢复了Spark的默认值,这是一个字符串。


如果您遇到此问题,则可以通过添加新列来修改此值,该列是前一个的复制,但将新列强制转换为Double Type。您无法更改上一列,因为Spark DataFrames是不可变的。


nz_pairs_distance = nz_pairs_distance.withColumn(“DISTANCE_FLOAT”, nz_pairs_distance.ABS_DISTANCE.cast(DoubleType()))

现在您的分析中包含了haversine距离元数据!


nz_pairs_distance.show(7)


您现在可以将其写入本地hfs目录,以用于R或Python中的可视化。


W(nz_pairs_distance).csv(path=”hdfs:///YOUR_DIRECTORY”, mode=’ignore’, header=’true’)

建议

  • 确保在使用UDF时始终使用实现结果所需的最小数据。这是因为如果UDF包含多个列操作,则作业将包含许多任务。
  • 始终检查原始函数返回的所需对象数据类型是否与UDF的对象数据类型一致。您可以通过将新变量插入终端来轻松地在Spark中进行检查。
  • 您可以使用Spark的DataFrameWriter将特定元数据保存到HDFS目录,然后用于使用其他工具(例如R,Python等)进行分析。这可以通过以下方式复制到local director :

hdfs dfs -copyToLocal hdfs:///user/YOUR_DIRECTORY/YOUR_FILE.csv

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至123456@qq.com 举报,一经查实,本站将立刻删除。

联系我们

工作日:9:30-18:30,节假日休息