fuck-ip/v2-cidr.py
2025-02-11 15:12:20 +08:00

317 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import geoip2.database
import matplotlib.pyplot as plt
import geopandas as gpd
import pandas as pd
import numpy as np
import ipaddress
import os
import sys
from collections import defaultdict
import matplotlib.font_manager as fm
from adjustText import adjust_text
import warnings
# 配置参数
IP_FILE = 'malicious_ips.txt'
GEOIP_DB = 'GeoLite2-City.mmdb'
OUTPUT_CHART = 'ip_geo_distribution.png'
MAP_DATA_URL = 'https://naciscdn.org/naturalearth/10m/cultural/ne_10m_admin_0_countries.zip'
# 性能优化配置
os.environ["OMP_NUM_THREADS"] = "1" # 修复内存泄漏警告
MAX_CACHE_SIZE = 10000 # 缓存最近查询的1万个IP
# 中文字体配置
FONT_PATH = 'C:/Windows/Fonts/msyh.ttc' if os.name == 'nt' else \
'/System/Library/Fonts/Supplemental/Songti.ttc'
if not os.path.exists(FONT_PATH):
FONT_PATH = fm.findfont(fm.FontProperties(family=['sans-serif']))
def configure_matplotlib():
"""配置Matplotlib中文字体"""
try:
plt.rcParams['font.sans-serif'] = fm.FontProperties(fname=FONT_PATH).get_name()
plt.rcParams['axes.unicode_minus'] = False
except Exception as e:
warnings.warn(f"字体配置异常: {str(e)}")
def cidr_to_ips(cidr):
"""智能CIDR解析生成器"""
try:
network = ipaddress.ip_network(cidr.strip(), strict=False)
# 对大型网络进行抽样处理
if network.version == 4:
if network.prefixlen <= 20:
step = 2 ** (24 - network.prefixlen)
else:
step = 1
else: # IPv6处理
if network.prefixlen <= 48:
step = 2 ** (64 - network.prefixlen)
else:
step = 1
count = 0
for ip in network.hosts():
if count % step == 0:
yield str(ip)
count += 1
except ValueError as e:
warnings.warn(f"无效CIDR格式: {cidr} - {str(e)}")
def load_ips(filename):
"""支持CIDR的IP加载器"""
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
# CIDR格式处理
if '/' in line:
yield from cidr_to_ips(line)
# 单个IP处理
else:
try:
ipaddress.ip_address(line)
yield line
except ValueError:
warnings.warn(f"忽略无效IP: {line}")
class GeoIPCache:
"""支持上下文管理器的带LRU缓存的GeoIP查询器"""
def __init__(self, db_path):
self.reader = geoip2.database.Reader(db_path)
self.cache = {}
self.lru = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def lookup(self, ip):
if ip in self.cache:
self.lru.remove(ip)
self.lru.insert(0, ip)
return self.cache[ip]
try:
response = self.reader.city(ip)
result = (
response.country.name or 'Unknown',
response.city.name or 'Unknown',
round(response.location.latitude, 2),
round(response.location.longitude, 2)
)
except Exception:
result = ('Unknown', 'Unknown', None, None)
# 更新缓存
self.cache[ip] = result
self.lru.insert(0, ip)
if len(self.lru) > MAX_CACHE_SIZE:
old_ip = self.lru.pop()
del self.cache[old_ip]
return result
def close(self):
self.reader.close()
def geo_lookup(ips):
"""执行地理信息查询"""
country_data = defaultdict(int)
city_data = defaultdict(int)
coord_data = []
with GeoIPCache(GEOIP_DB) as cache: # 现在支持上下文管理器
for ip in ips:
country, city, lat, lon = cache.lookup(ip)
if country and country != 'Unknown':
country_data[country] += 1
if city and city != 'Unknown' and lat and lon:
city_key = (city, country, lat, lon)
city_data[city_key] += 1
coord_data.append((lon, lat))
return country_data, city_data, coord_data
def create_city_layer(city_data):
"""创建城市数据层"""
cities = []
for (city, country, lat, lon), count in city_data.items():
cities.append({
'City': city,
'Country': country,
'Latitude': lat,
'Longitude': lon,
'Count': count
})
city_df = pd.DataFrame(cities)
# 合并同一城市不同坐标的数据
city_df = city_df.groupby(['City', 'Country']).agg({
'Latitude': 'mean',
'Longitude': 'mean',
'Count': 'sum'
}).reset_index()
if not city_df.empty:
return gpd.GeoDataFrame(
city_df,
geometry=gpd.points_from_xy(city_df.Longitude, city_df.Latitude)
)
return None
def visualize_distribution(data):
"""生成可视化图表"""
configure_matplotlib()
country_data, city_data, coords = data
try:
# 加载高精度地图
world = gpd.read_file(MAP_DATA_URL)
# 国家名称标准化
name_mapping = {
'United States of America': 'United States',
'Russian Federation': 'Russia',
'Iran (Islamic Republic of)': 'Iran',
'Viet Nam': 'Vietnam',
'Korea, Republic of': 'South Korea',
'Hong Kong S.A.R.': 'Hong Kong',
'Taiwan': 'Taiwan Province of China'
}
world['NAME'] = world['NAME'].replace(name_mapping)
# 合并国家数据
country_df = world.merge(
pd.DataFrame.from_dict(country_data, orient='index', columns=['Country_Count']),
how="left",
left_on='NAME',
right_index=True
)
# 创建绘图画布
fig, ax = plt.subplots(figsize=(24, 16))
# 绘制国家层
country_plot = country_df.plot(
ax=ax,
column='Country_Count',
cmap='YlOrRd',
edgecolor='#333333',
linewidth=0.5,
legend=True,
legend_kwds={
'label': "国家级别IP数量",
'orientation': "horizontal",
'shrink': 0.5
},
missing_kwds={"color": "lightgrey"}
)
# 绘制城市层
city_gdf = create_city_layer(city_data)
if city_gdf is not None and not city_gdf.empty:
counts = city_gdf['Count']
min_count, max_count = counts.min(), counts.max()
# 动态计算点大小
sizes = np.interp(
counts,
(min_count, max_count),
(20, 200)
) if min_count != max_count else 100
city_gdf.plot(
ax=ax,
markersize=sizes,
color='darkred',
alpha=0.6,
edgecolor='black',
linewidth=0.3,
marker='o'
)
# 添加城市标签
texts = []
top_cities = city_gdf.nlargest(20, 'Count')
for _, row in top_cities.iterrows():
texts.append(ax.text(
row.geometry.x,
row.geometry.y,
f"{row['City']}\n{row['Count']}",
fontproperties=fm.FontProperties(fname=FONT_PATH, size=8),
ha='center',
va='center',
bbox=dict(
facecolor='white',
alpha=0.8,
edgecolor='none',
boxstyle='round,pad=0.2'
)
))
# 自动调整标签布局
adjust_text(
texts,
arrowprops=dict(
arrowstyle='-',
color='gray',
lw=0.5
),
ax=ax
)
# 添加标注
ax.annotate(
'数据来源: MaxMind GeoLite2',
xy=(0.72, 0.04),
xycoords='figure fraction',
fontproperties=fm.FontProperties(fname=FONT_PATH, size=9)
)
plt.title(
'全球恶意IP分布热力图国家/城市层级)',
fontproperties=fm.FontProperties(fname=FONT_PATH, size=22),
pad=20
)
plt.axis('off')
plt.savefig(OUTPUT_CHART, dpi=400, bbox_inches='tight')
plt.close()
except Exception as e:
print(f"地图渲染失败: {str(e)}")
raise
if __name__ == '__main__':
warnings.filterwarnings("ignore", category=UserWarning)
# 加载IP列表
ips = list(load_ips(IP_FILE))
print(f"成功加载 {len(ips)} 个IP地址")
# 地理查询
country_data, city_data, coords = geo_lookup(ips)
# 打印统计信息
print("\n国家统计TOP10:")
for country, count in sorted(country_data.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f"{country}: {count}")
print("\n城市统计TOP10:")
sorted_cities = sorted(city_data.items(), key=lambda x: x[1], reverse=True)[:10]
for (city, country, _, _), count in sorted_cities:
print(f"{country}-{city}: {count}")
# 生成可视化
visualize_distribution((country_data, city_data, coords))
print(f"\n可视化结果已保存至 {OUTPUT_CHART}")