fuck-ip/v2_city.py
2025-02-11 15:12:20 +08:00

245 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import geoip2.database
import matplotlib
import matplotlib.pyplot as plt
import geopandas as gpd
import pandas as pd
import numpy as np
import os
from collections import defaultdict
import matplotlib.font_manager as fm
from adjustText import adjust_text
# 配置参数
IP_FILE = 'malicious_ips.txt'
GEOIP_DB = 'GeoLite2-City.mmdb'
OUTPUT_CHART = 'ip_geo_distribution.png'
MAP_DATA_URL = 'https://naciscdn.org/naturalearth/10m/cultural/ne_10m_admin_0_countries.zip'
# 内存优化配置
os.environ["OMP_NUM_THREADS"] = "1"
# 中文字体配置
FONT_PATH = 'C:/Windows/Fonts/msyh.ttc' if os.name == 'nt' else \
'/System/Library/Fonts/Supplemental/Songti.ttc'
if not os.path.exists(FONT_PATH):
FONT_PATH = fm.findfont(fm.FontProperties(family=['sans-serif']))
def configure_matplotlib():
"""配置Matplotlib中文字体"""
try:
plt.rcParams['font.sans-serif'] = fm.FontProperties(fname=FONT_PATH).get_name()
plt.rcParams['axes.unicode_minus'] = False
except Exception as e:
print(f"字体配置异常: {str(e)}")
def geo_lookup(ips, db_path):
"""执行地理信息查询(精确到城市)"""
city_data = defaultdict(int)
country_data = defaultdict(int)
coord_data = []
with geoip2.database.Reader(db_path) as reader:
for ip in ips:
try:
response = reader.city(ip)
country = response.country.name
city = response.city.name
latitude = response.location.latitude
longitude = response.location.longitude
if country:
country_data[country] += 1
if city and latitude and longitude:
# 对经纬度进行标准化处理保留2位小数
city_key = (
city,
country,
round(latitude, 2),
round(longitude, 2)
)
city_data[city_key] += 1
coord_data.append((longitude, latitude))
except Exception as e:
continue
return country_data, city_data, coord_data
def create_city_layer(city_data):
"""创建城市数据层"""
cities = []
for (city, country, lat, lon), count in city_data.items():
cities.append({
'City': city,
'Country': country,
'Latitude': lat,
'Longitude': lon,
'Count': count
})
city_df = pd.DataFrame(cities)
# 合并同一城市不同坐标的数据(取最大计数)
city_df = city_df.groupby(['City', 'Country']).agg({
'Latitude': 'mean',
'Longitude': 'mean',
'Count': 'sum'
}).reset_index()
if not city_df.empty:
city_gdf = gpd.GeoDataFrame(
city_df,
geometry=gpd.points_from_xy(city_df.Longitude, city_df.Latitude)
)
return city_gdf
return None
def visualize_combined(data):
"""生成组合可视化图表"""
configure_matplotlib()
country_data, city_data, coords = data
world = gpd.read_file(MAP_DATA_URL)
# 国家名称标准化映射
name_mapping = {
'United States of America': 'United States',
'Russian Federation': 'Russia',
'Iran (Islamic Republic of)': 'Iran',
'Viet Nam': 'Vietnam',
'Korea, Republic of': 'South Korea',
'Hong Kong S.A.R.': 'Hong Kong',
'Taiwan': 'Taiwan Province of China'
}
world['NAME'] = world['NAME'].replace(name_mapping)
# 国家级别数据处理
country_df = world.merge(
pd.DataFrame.from_dict(country_data, orient='index', columns=['Country_Count']),
how="left",
left_on='NAME',
right_index=True
)
# 城市级别数据处理
city_gdf = create_city_layer(city_data)
# 创建绘图画布
fig, ax = plt.subplots(figsize=(24, 16))
# 绘制国家层热力图
country_plot = country_df.plot(
ax=ax,
column='Country_Count',
cmap='YlOrRd',
edgecolor='black',
linewidth=0.3,
legend=True,
legend_kwds={
'label': "国家级别IP数量",
'orientation': "horizontal",
'shrink': 0.6
},
missing_kwds={"color": "lightgrey"}
)
# 绘制城市热力点
if city_gdf is not None and not city_gdf.empty:
# 动态计算点大小
counts = city_gdf['Count']
min_count, max_count = counts.min(), counts.max()
if min_count == max_count:
sizes = 100 # 所有点大小相同
else:
sizes = np.interp(
counts,
(min_count, max_count),
(20, 200) # 点大小范围
)
# 绘制散点图
city_gdf.plot(
ax=ax,
markersize=sizes,
color='darkred',
alpha=0.6,
edgecolor='black',
linewidth=0.3,
marker='o',
label='城市热点'
)
# 添加城市标签前20大热点
texts = []
top_cities = city_gdf.nlargest(20, 'Count')
for _, row in top_cities.iterrows():
texts.append(ax.text(
row.geometry.x,
row.geometry.y,
f"{row['City']}\n{row['Count']}",
fontproperties=fm.FontProperties(fname=FONT_PATH, size=8),
ha='center',
va='center',
bbox=dict(
facecolor='white',
alpha=0.8,
edgecolor='none',
boxstyle='round,pad=0.2'
)
))
# 自动调整标签位置防止重叠
adjust_text(
texts,
arrowprops=dict(
arrowstyle='-',
color='gray',
lw=0.5
),
ax=ax
)
# 添加数据来源标注
ax.annotate(
'数据来源: MaxMind GeoLite2',
xy=(0.68, 0.04),
xycoords='figure fraction',
fontproperties=fm.FontProperties(fname=FONT_PATH, size=9)
)
# 设置标题和输出
plt.title(
'全球恶意IP分布热力图国家/城市层级)',
fontproperties=fm.FontProperties(fname=FONT_PATH, size=22),
pad=20
)
plt.axis('off')
plt.savefig(OUTPUT_CHART, dpi=400, bbox_inches='tight')
plt.close()
if __name__ == '__main__':
# 加载IP列表
with open(IP_FILE, 'r', encoding='utf-8') as f:
ips = [line.strip() for line in f if line.strip()]
print(f"成功加载 {len(ips)} 个IP地址")
# 执行地理查询
country_data, city_data, coords = geo_lookup(ips, GEOIP_DB)
# 打印统计数据
print("\n国家统计TOP10:")
for country, count in sorted(country_data.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f"{country}: {count}")
print("\n城市统计TOP10:")
sorted_cities = sorted(
city_data.items(),
key=lambda x: x[1],
reverse=True
)[:10]
for (city, country, _, _), count in sorted_cities:
print(f"{country}-{city}: {count}")
# 生成可视化图表
visualize_combined((country_data, city_data, coords))
print(f"\n可视化结果已保存至 {OUTPUT_CHART}")