import geoip2.database import matplotlib.pyplot as plt import geopandas as gpd import pandas as pd import numpy as np import ipaddress import os import sys from collections import defaultdict import matplotlib.font_manager as fm from adjustText import adjust_text import warnings # 配置参数 IP_FILE = 'malicious_ips.txt' GEOIP_DB = 'GeoLite2-City.mmdb' OUTPUT_CHART = 'ip_geo_distribution.png' MAP_DATA_URL = 'https://naciscdn.org/naturalearth/10m/cultural/ne_10m_admin_0_countries.zip' # 性能优化配置 os.environ["OMP_NUM_THREADS"] = "1" # 修复内存泄漏警告 MAX_CACHE_SIZE = 10000 # 缓存最近查询的1万个IP # 中文字体配置 FONT_PATH = 'C:/Windows/Fonts/msyh.ttc' if os.name == 'nt' else \ '/System/Library/Fonts/Supplemental/Songti.ttc' if not os.path.exists(FONT_PATH): FONT_PATH = fm.findfont(fm.FontProperties(family=['sans-serif'])) def configure_matplotlib(): """配置Matplotlib中文字体""" try: plt.rcParams['font.sans-serif'] = fm.FontProperties(fname=FONT_PATH).get_name() plt.rcParams['axes.unicode_minus'] = False except Exception as e: warnings.warn(f"字体配置异常: {str(e)}") def cidr_to_ips(cidr): """智能CIDR解析生成器""" try: network = ipaddress.ip_network(cidr.strip(), strict=False) # 对大型网络进行抽样处理 if network.version == 4: if network.prefixlen <= 20: step = 2 ** (24 - network.prefixlen) else: step = 1 else: # IPv6处理 if network.prefixlen <= 48: step = 2 ** (64 - network.prefixlen) else: step = 1 count = 0 for ip in network.hosts(): if count % step == 0: yield str(ip) count += 1 except ValueError as e: warnings.warn(f"无效CIDR格式: {cidr} - {str(e)}") def load_ips(filename): """支持CIDR的IP加载器""" with open(filename, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue # CIDR格式处理 if '/' in line: yield from cidr_to_ips(line) # 单个IP处理 else: try: ipaddress.ip_address(line) yield line except ValueError: warnings.warn(f"忽略无效IP: {line}") class GeoIPCache: """支持上下文管理器的带LRU缓存的GeoIP查询器""" def __init__(self, db_path): self.reader = geoip2.database.Reader(db_path) self.cache = {} self.lru = [] def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def lookup(self, ip): if ip in self.cache: self.lru.remove(ip) self.lru.insert(0, ip) return self.cache[ip] try: response = self.reader.city(ip) result = ( response.country.name or 'Unknown', response.city.name or 'Unknown', round(response.location.latitude, 2), round(response.location.longitude, 2) ) except Exception: result = ('Unknown', 'Unknown', None, None) # 更新缓存 self.cache[ip] = result self.lru.insert(0, ip) if len(self.lru) > MAX_CACHE_SIZE: old_ip = self.lru.pop() del self.cache[old_ip] return result def close(self): self.reader.close() def geo_lookup(ips): """执行地理信息查询""" country_data = defaultdict(int) city_data = defaultdict(int) coord_data = [] with GeoIPCache(GEOIP_DB) as cache: # 现在支持上下文管理器 for ip in ips: country, city, lat, lon = cache.lookup(ip) if country and country != 'Unknown': country_data[country] += 1 if city and city != 'Unknown' and lat and lon: city_key = (city, country, lat, lon) city_data[city_key] += 1 coord_data.append((lon, lat)) return country_data, city_data, coord_data def create_city_layer(city_data): """创建城市数据层""" cities = [] for (city, country, lat, lon), count in city_data.items(): cities.append({ 'City': city, 'Country': country, 'Latitude': lat, 'Longitude': lon, 'Count': count }) city_df = pd.DataFrame(cities) # 合并同一城市不同坐标的数据 city_df = city_df.groupby(['City', 'Country']).agg({ 'Latitude': 'mean', 'Longitude': 'mean', 'Count': 'sum' }).reset_index() if not city_df.empty: return gpd.GeoDataFrame( city_df, geometry=gpd.points_from_xy(city_df.Longitude, city_df.Latitude) ) return None def visualize_distribution(data): """生成可视化图表""" configure_matplotlib() country_data, city_data, coords = data try: # 加载高精度地图 world = gpd.read_file(MAP_DATA_URL) # 国家名称标准化 name_mapping = { 'United States of America': 'United States', 'Russian Federation': 'Russia', 'Iran (Islamic Republic of)': 'Iran', 'Viet Nam': 'Vietnam', 'Korea, Republic of': 'South Korea', 'Hong Kong S.A.R.': 'Hong Kong', 'Taiwan': 'Taiwan Province of China' } world['NAME'] = world['NAME'].replace(name_mapping) # 合并国家数据 country_df = world.merge( pd.DataFrame.from_dict(country_data, orient='index', columns=['Country_Count']), how="left", left_on='NAME', right_index=True ) # 创建绘图画布 fig, ax = plt.subplots(figsize=(24, 16)) # 绘制国家层 country_plot = country_df.plot( ax=ax, column='Country_Count', cmap='YlOrRd', edgecolor='#333333', linewidth=0.5, legend=True, legend_kwds={ 'label': "国家级别IP数量", 'orientation': "horizontal", 'shrink': 0.5 }, missing_kwds={"color": "lightgrey"} ) # 绘制城市层 city_gdf = create_city_layer(city_data) if city_gdf is not None and not city_gdf.empty: counts = city_gdf['Count'] min_count, max_count = counts.min(), counts.max() # 动态计算点大小 sizes = np.interp( counts, (min_count, max_count), (20, 200) ) if min_count != max_count else 100 city_gdf.plot( ax=ax, markersize=sizes, color='darkred', alpha=0.6, edgecolor='black', linewidth=0.3, marker='o' ) # 添加城市标签 texts = [] top_cities = city_gdf.nlargest(20, 'Count') for _, row in top_cities.iterrows(): texts.append(ax.text( row.geometry.x, row.geometry.y, f"{row['City']}\n{row['Count']}", fontproperties=fm.FontProperties(fname=FONT_PATH, size=8), ha='center', va='center', bbox=dict( facecolor='white', alpha=0.8, edgecolor='none', boxstyle='round,pad=0.2' ) )) # 自动调整标签布局 adjust_text( texts, arrowprops=dict( arrowstyle='-', color='gray', lw=0.5 ), ax=ax ) # 添加标注 ax.annotate( '数据来源: MaxMind GeoLite2', xy=(0.72, 0.04), xycoords='figure fraction', fontproperties=fm.FontProperties(fname=FONT_PATH, size=9) ) plt.title( '全球恶意IP分布热力图(国家/城市层级)', fontproperties=fm.FontProperties(fname=FONT_PATH, size=22), pad=20 ) plt.axis('off') plt.savefig(OUTPUT_CHART, dpi=400, bbox_inches='tight') plt.close() except Exception as e: print(f"地图渲染失败: {str(e)}") raise if __name__ == '__main__': warnings.filterwarnings("ignore", category=UserWarning) # 加载IP列表 ips = list(load_ips(IP_FILE)) print(f"成功加载 {len(ips)} 个IP地址") # 地理查询 country_data, city_data, coords = geo_lookup(ips) # 打印统计信息 print("\n国家统计TOP10:") for country, count in sorted(country_data.items(), key=lambda x: x[1], reverse=True)[:10]: print(f"{country}: {count}") print("\n城市统计TOP10:") sorted_cities = sorted(city_data.items(), key=lambda x: x[1], reverse=True)[:10] for (city, country, _, _), count in sorted_cities: print(f"{country}-{city}: {count}") # 生成可视化 visualize_distribution((country_data, city_data, coords)) print(f"\n可视化结果已保存至 {OUTPUT_CHART}")