import geoip2.database import matplotlib import matplotlib.pyplot as plt import geopandas as gpd import pandas as pd import numpy as np import os from collections import defaultdict import matplotlib.font_manager as fm from adjustText import adjust_text # 配置参数 IP_FILE = 'malicious_ips.txt' GEOIP_DB = 'GeoLite2-City.mmdb' OUTPUT_CHART = 'ip_geo_distribution.png' MAP_DATA_URL = 'https://naciscdn.org/naturalearth/10m/cultural/ne_10m_admin_0_countries.zip' # 内存优化配置 os.environ["OMP_NUM_THREADS"] = "1" # 中文字体配置 FONT_PATH = 'C:/Windows/Fonts/msyh.ttc' if os.name == 'nt' else \ '/System/Library/Fonts/Supplemental/Songti.ttc' if not os.path.exists(FONT_PATH): FONT_PATH = fm.findfont(fm.FontProperties(family=['sans-serif'])) def configure_matplotlib(): """配置Matplotlib中文字体""" try: plt.rcParams['font.sans-serif'] = fm.FontProperties(fname=FONT_PATH).get_name() plt.rcParams['axes.unicode_minus'] = False except Exception as e: print(f"字体配置异常: {str(e)}") def geo_lookup(ips, db_path): """执行地理信息查询(精确到城市)""" city_data = defaultdict(int) country_data = defaultdict(int) coord_data = [] with geoip2.database.Reader(db_path) as reader: for ip in ips: try: response = reader.city(ip) country = response.country.name city = response.city.name latitude = response.location.latitude longitude = response.location.longitude if country: country_data[country] += 1 if city and latitude and longitude: # 对经纬度进行标准化处理(保留2位小数) city_key = ( city, country, round(latitude, 2), round(longitude, 2) ) city_data[city_key] += 1 coord_data.append((longitude, latitude)) except Exception as e: continue return country_data, city_data, coord_data def create_city_layer(city_data): """创建城市数据层""" cities = [] for (city, country, lat, lon), count in city_data.items(): cities.append({ 'City': city, 'Country': country, 'Latitude': lat, 'Longitude': lon, 'Count': count }) city_df = pd.DataFrame(cities) # 合并同一城市不同坐标的数据(取最大计数) city_df = city_df.groupby(['City', 'Country']).agg({ 'Latitude': 'mean', 'Longitude': 'mean', 'Count': 'sum' }).reset_index() if not city_df.empty: city_gdf = gpd.GeoDataFrame( city_df, geometry=gpd.points_from_xy(city_df.Longitude, city_df.Latitude) ) return city_gdf return None def visualize_combined(data): """生成组合可视化图表""" configure_matplotlib() country_data, city_data, coords = data world = gpd.read_file(MAP_DATA_URL) # 国家名称标准化映射 name_mapping = { 'United States of America': 'United States', 'Russian Federation': 'Russia', 'Iran (Islamic Republic of)': 'Iran', 'Viet Nam': 'Vietnam', 'Korea, Republic of': 'South Korea', 'Hong Kong S.A.R.': 'Hong Kong', 'Taiwan': 'Taiwan Province of China' } world['NAME'] = world['NAME'].replace(name_mapping) # 国家级别数据处理 country_df = world.merge( pd.DataFrame.from_dict(country_data, orient='index', columns=['Country_Count']), how="left", left_on='NAME', right_index=True ) # 城市级别数据处理 city_gdf = create_city_layer(city_data) # 创建绘图画布 fig, ax = plt.subplots(figsize=(24, 16)) # 绘制国家层热力图 country_plot = country_df.plot( ax=ax, column='Country_Count', cmap='YlOrRd', edgecolor='black', linewidth=0.3, legend=True, legend_kwds={ 'label': "国家级别IP数量", 'orientation': "horizontal", 'shrink': 0.6 }, missing_kwds={"color": "lightgrey"} ) # 绘制城市热力点 if city_gdf is not None and not city_gdf.empty: # 动态计算点大小 counts = city_gdf['Count'] min_count, max_count = counts.min(), counts.max() if min_count == max_count: sizes = 100 # 所有点大小相同 else: sizes = np.interp( counts, (min_count, max_count), (20, 200) # 点大小范围 ) # 绘制散点图 city_gdf.plot( ax=ax, markersize=sizes, color='darkred', alpha=0.6, edgecolor='black', linewidth=0.3, marker='o', label='城市热点' ) # 添加城市标签(前20大热点) texts = [] top_cities = city_gdf.nlargest(20, 'Count') for _, row in top_cities.iterrows(): texts.append(ax.text( row.geometry.x, row.geometry.y, f"{row['City']}\n{row['Count']}", fontproperties=fm.FontProperties(fname=FONT_PATH, size=8), ha='center', va='center', bbox=dict( facecolor='white', alpha=0.8, edgecolor='none', boxstyle='round,pad=0.2' ) )) # 自动调整标签位置防止重叠 adjust_text( texts, arrowprops=dict( arrowstyle='-', color='gray', lw=0.5 ), ax=ax ) # 添加数据来源标注 ax.annotate( '数据来源: MaxMind GeoLite2', xy=(0.68, 0.04), xycoords='figure fraction', fontproperties=fm.FontProperties(fname=FONT_PATH, size=9) ) # 设置标题和输出 plt.title( '全球恶意IP分布热力图(国家/城市层级)', fontproperties=fm.FontProperties(fname=FONT_PATH, size=22), pad=20 ) plt.axis('off') plt.savefig(OUTPUT_CHART, dpi=400, bbox_inches='tight') plt.close() if __name__ == '__main__': # 加载IP列表 with open(IP_FILE, 'r', encoding='utf-8') as f: ips = [line.strip() for line in f if line.strip()] print(f"成功加载 {len(ips)} 个IP地址") # 执行地理查询 country_data, city_data, coords = geo_lookup(ips, GEOIP_DB) # 打印统计数据 print("\n国家统计TOP10:") for country, count in sorted(country_data.items(), key=lambda x: x[1], reverse=True)[:10]: print(f"{country}: {count}") print("\n城市统计TOP10:") sorted_cities = sorted( city_data.items(), key=lambda x: x[1], reverse=True )[:10] for (city, country, _, _), count in sorted_cities: print(f"{country}-{city}: {count}") # 生成可视化图表 visualize_combined((country_data, city_data, coords)) print(f"\n可视化结果已保存至 {OUTPUT_CHART}")