- add 2 databases sources

- implement timezone fetch (and internal stuff for citites)
This commit is contained in:
cclecle
2022-07-04 17:35:18 +02:00
parent 97df5b6235
commit 3832dbe421
2 changed files with 368 additions and 103 deletions

View File

@@ -18,7 +18,7 @@ import tempfile
from git import Repo
import os
from pathlib import Path
from netaddr import IPSet,IPAddress
from netaddr import IPSet,IPAddress,IPRange,AddrFormatError
import asyncio
from concurrent.futures import ProcessPoolExecutor
from ipaddress import ip_address, IPv4Address
@@ -30,25 +30,65 @@ from asyncstdlib import lru_cache
from dns import resolver,reversename
import time
import re
import requests
import csv
import gzip
DEFAULT__Cache_Expiration_Second = 600
DEFAULT__Cache_Size_Entries = 2048
DEFAULT__IpDataSet_UpdatePeriod_Second = 3600
DEFAULT__Num_IpDataSet_Workers = 10
DEFAULT__IpDataSet_GitRepo_Address = "https://chacha.ddns.net/gitea/chacha/country-ip-blocks"
DEFAULT__EnableIPV6 = True
DEFAULT__IpDataSet_ipv4_subdir = "ipv4"
DEFAULT__IpDataSet_ipv6_subdir = "ipv6"
DEFAULT_onlyFR = False
DEFAULT__Cache_Expiration_Second = 600
DEFAULT__Cache_Size_Entries = 2048
DEFAULT__IpDataSet_UpdatePeriod_Second = 3600
DEFAULT__Num_IpDataSet_Workers = 10
DEFAULT__IpDataSet_GitRepo_Address = "https://chacha.ddns.net/gitea/chacha/country-ip-blocks"
DEFAULT__Ip2CountryDataSet_CSV_Addresses_IPv4 = "https://chacha.ddns.net/gitea/chacha/ip-location-db/raw/branch/master/geo-whois-asn-country/geo-whois-asn-country-ipv4-num.csv"
DEFAULT__Ip2CountryDataSet_CSV_Addresses_IPv6 = "https://chacha.ddns.net/gitea/chacha/ip-location-db/raw/branch/master/geo-whois-asn-country/geo-whois-asn-country-ipv6-num.csv"
DEFAULT__Ip2CountryDataSet_CSV_Addresses_IPv4_Cities = "https://chacha.ddns.net/gitea/chacha/ip-location-db/raw/branch/master/geolite2-city/geolite2-city-ipv4-num.csv.gz"
DEFAULT__Ip2CountryDataSet_CSV_Addresses_IPv6_Cities = "https://chacha.ddns.net/gitea/chacha/ip-location-db/raw/branch/master/geolite2-city/geolite2-city-ipv6-num.csv.gz"
DEFAULT__EnableIPV6 = True
DEFAULT__EnableFilesDb = True
DEFAULT__EnableCSVDb = True
DEFAULT__EnableCSVCitiesDb = True
DEFAULT__IpDataSet_ipv4_subdir = "ipv4"
DEFAULT__IpDataSet_ipv6_subdir = "ipv6"
DEFAULT_onlyFR = False
def processfile(_entry_path,_entry_code) :
#class extend to disable automatic-compact at every IP add(), rather doing it manually at the end with new ForceCompact()
class IPSet_nocompact(IPSet):
def __init__(self, iterable=None, flags=0):
super().__init__(iterable,flags)
self.bDisableCompact = True
def compact(self):
if not self.bDisableCompact:
return super().compact()
def _compact_single_network(self, added_network):
if not self.bDisableCompact:
return super()._compact_single_network(added_network)
def ForceCompact(self):
self.bDisableCompact = False
try:
res = self.compact()
return res
finally:
self.bDisableCompact = True
def processfile(_entry_path,*identifier) :
with open(_entry_path) as fp:
_set = IPSet()
_set = IPSet_nocompact()
for line in fp:
_set.add(line)
print("PARSE DONE({0})".format(_entry_path))
return _entry_code,_set
try:
_set.add(line)
except AddrFormatError:
print("[Warning] Wrong IP address format: {0}".format(line))
_set.ForceCompact()
print("FILE PARSE DONE ({0})".format(identifier))
return (_set,) + identifier
def processIPRangeList(ranges,*identifier) :
_set = IPSet_nocompact()
for range in ranges:
_set.add(range)
_set.ForceCompact()
print("IPRANGE PARSE DONE ({0})".format(identifier))
return (_set,) + identifier
def validIPAddress(IP: str) -> str:
try:
@@ -73,33 +113,39 @@ def timed_lru_cache(seconds: int, maxsize: int = None):
class ChaChaIPToCountryStorage:
def __init__(self,url=None,user:str=None,password=None):
self.ipv4set = dict()
self.ipv4setTz = dict() #timezones
self.ipv6set = dict()
self.ipv6setTz = dict() #timezones
self.tempdir = tempfile.TemporaryDirectory()
self.tempdir_url = self.tempdir.name + os.sep
self.ipv4_dir = os.path.join(self.tempdir.name,DEFAULT__IpDataSet_ipv4_subdir)
self.ipv6_dir = os.path.join(self.tempdir.name,DEFAULT__IpDataSet_ipv6_subdir)
self.DataSetlock = asyncio.Lock()
print("New dataset dir = " + self.tempdir_url)
self.gitrepo = None
if not url:
url = DEFAULT__IpDataSet_GitRepo_Address
print("init storage")
if DEFAULT__EnableFilesDb:
print("New dataset dir = " + self.tempdir_url)
if not url:
url = DEFAULT__IpDataSet_GitRepo_Address
m = re.match(r"(?P<proto>http(?:s?):\/\/)?(?P<url>.*)",url)
proto = m.group('proto')
right_url = m.group('url')
m = re.match(r"(?P<proto>http(?:s?):\/\/)?(?P<url>.*)",url)
proto = m.group('proto')
right_url = m.group('url')
full_url = proto
if user:
full_url += user
if password:
full_url += ":" + password
full_url += "@"
full_url += right_url
print("Cloning GIT repos: "+ full_url)
self.gitrepo = Repo.clone_from(full_url,self.tempdir_url )
print("Done")
full_url = proto
if user:
full_url += user
if password:
full_url += ":" + password
full_url += "@"
full_url += right_url
print("Cloning GIT repos: "+ full_url)
self.gitrepo = Repo.clone_from(full_url,self.tempdir_url )
print("Done")
asyncio.get_event_loop().run_until_complete(self.update_repo(True))
print("Finished")
@@ -109,97 +155,313 @@ class ChaChaIPToCountryStorage:
set = dict()
await self.DataSetlock.acquire()
if validity == "IPv4":
set = self.ipv4set
elif validity == "IPv6" and DEFAULT__EnableIPV6:
set = self.ipv6set
else:
RuntimeError("Invalid ip address received")
try:
await self.DataSetlock.acquire()
if validity == "IPv4":
set = self.ipv4set
setTz = self.ipv4setTz
elif validity == "IPv6" and DEFAULT__EnableIPV6:
set = self.ipv6set
setTz = self.ipv6setTz
else:
RuntimeError("Invalid ip address received")
country = "ZZ"
timezone = "Etc/Universal"
#searching country
for key,_set in set.items():
if IPAddress(ip) in _set:
return str(key)
#searching timezone
for Tzkey,Tzset in setTz[key].items():
if IPAddress(ip) in Tzset:
timezone = str(Tzkey)
break
country = str(key)
break
finally:
self.DataSetlock.release()
return "ZZ"
return country,timezone
async def update_repo(self,bFirst=False):
start = time.time()
print("== DATASET UPDATE REQUESTED ==")
print("Pulling GIT dataset repo")
o = self.gitrepo.remotes.origin
ret = o.pull()
if not bFirst:
if ret[0].flags == 4:
print("no changes detected")
return
print("Done")
tasksIPV4 = []
if DEFAULT__EnableIPV6:
tasksIPV6 = []
loop = asyncio.get_event_loop()
_executor = ProcessPoolExecutor(DEFAULT__Num_IpDataSet_Workers)
print("Parsing IPV4 dataset")
with os.scandir(self.ipv4_dir) as entries:
for entry in entries:
entry_path = os.path.join(self.ipv4_dir,entry.name)
entry_code = Path(entry_path).stem
if DEFAULT_onlyFR and (entry_code != "fr"):
continue
task = loop.run_in_executor(_executor,processfile,entry_path,entry_code)
tasksIPV4.append(task)
print("Done")
if DEFAULT__EnableIPV6:
print("Parsing IPV6 dataset")
with os.scandir(self.ipv6_dir) as entries:
for entry in entries:
entry_path = os.path.join(self.ipv6_dir,entry.name)
entry_code = Path(entry_path).stem
if DEFAULT_onlyFR and (entry_code != "fr"):
continue
task = loop.run_in_executor(_executor,processfile,entry_path,entry_code)
tasksIPV6.append(task)
print("Done")
print("Wait IPV4 parsing to complete...")
results = await asyncio.gather(*tasksIPV4)
_ipv4set=dict()
for _entry_code,_set in results:
_ipv4set[_entry_code] = _set
_ipv4setTz=dict()
if DEFAULT__EnableIPV6:
_ipv6set=dict()
_ipv6setTz=dict()
loop = asyncio.get_event_loop()
_executor = ProcessPoolExecutor(DEFAULT__Num_IpDataSet_Workers)
tasksIPV4 = []
tasksIPV4Tz = []
if DEFAULT__EnableIPV6:
tasksIPV6 = []
tasksIPV6Tz = []
print("== DATASET UPDATE REQUESTED ==")
if DEFAULT__EnableFilesDb:
skip = False
print("Pulling GIT dataset repo")
o = self.gitrepo.remotes.origin
ret = o.pull()
if not bFirst:
if ret[0].flags == 4:
print("no changes detected")
skip = True
print("Done")
if not skip:
print("Parsing IPV4 dataset (files)")
with os.scandir(self.ipv4_dir) as entries:
for entry in entries:
entry_path = os.path.join(self.ipv4_dir,entry.name)
entry_code = Path(entry_path).stem.upper()
if DEFAULT_onlyFR and (entry_code != "fr"):
continue
task = loop.run_in_executor(_executor,processfile,entry_path,entry_code)
tasksIPV4.append(task)
print("IPV4 dataset (files) Started")
if DEFAULT__EnableIPV6:
print("Parsing IPV6 dataset (files)")
with os.scandir(self.ipv6_dir) as entries:
for entry in entries:
entry_path = os.path.join(self.ipv6_dir,entry.name)
entry_code = Path(entry_path).stem.upper()
if DEFAULT_onlyFR and (entry_code != "FR"):
continue
task = loop.run_in_executor(_executor,processfile,entry_path,entry_code)
tasksIPV6.append(task)
print("IPV6 dataset (files) Started")
if DEFAULT__EnableCSVDb:
print("Updating IPV4 dataset (CSV)")
response = requests.get(DEFAULT__Ip2CountryDataSet_CSV_Addresses_IPv4)
txt_resp = response.content.decode()
data = csv.reader(txt_resp.splitlines(), delimiter=',')
print('pre-computing: separating countries')
_IPRange_percountry = dict()
for ip_record in data:
start_ip_block = int(ip_record[0])
end_ip_block = int(ip_record[1])
country = ip_record[2]
if DEFAULT_onlyFR and (country != "FR"):
continue
if country not in _IPRange_percountry:
_IPRange_percountry[country] = []
try:
_IPRange_percountry[country].append(IPRange(start_ip_block,end_ip_block))
except AddrFormatError:
print("[Warning] Wrong IP address format: {0}".format(str(range)))
continue
for key in _IPRange_percountry.keys():
task = loop.run_in_executor(_executor,processIPRangeList,_IPRange_percountry[key],key)
tasksIPV4.append(task)
print("IPV4 dataset (CSV) Started")
if DEFAULT__EnableIPV6:
print("Updating IPV6 dataset (CSV)")
response = requests.get(DEFAULT__Ip2CountryDataSet_CSV_Addresses_IPv6)
txt_resp = response.content.decode()
data = csv.reader(txt_resp.splitlines(), delimiter=',')
print('pre-computing: separating countries')
_IPRange_percountry = dict()
for ip_record in data:
start_ip_block = int(ip_record[0])
end_ip_block = int(ip_record[1])
country = ip_record[2]
if DEFAULT_onlyFR and (country != "FR"):
continue
if country not in _IPRange_percountry:
_IPRange_percountry[country] = []
try:
_IPRange_percountry[country].append(IPRange(IPAddress(start_ip_block,version=6),IPAddress(end_ip_block,version=6)))
except AddrFormatError:
print("[Warning] Wrong IP address format: {0}".format(str(range)))
continue
for key in _IPRange_percountry.keys():
task = loop.run_in_executor(_executor,processIPRangeList,_IPRange_percountry[key],key)
tasksIPV6.append(task)
print("IPV6 dataset (CSV) Started")
if DEFAULT__EnableCSVCitiesDb:
print("processing TimeZones (IPv4)")
response = requests.get(DEFAULT__Ip2CountryDataSet_CSV_Addresses_IPv4_Cities)
byte_resp = gzip.decompress(response.content)
response=None
txt_resp = byte_resp.decode()
byte_resp=None
data = csv.reader(txt_resp.splitlines(), delimiter=',')
txt_resp=None
print('pre-computing: separating countries')
_IPRange_percountry = dict()
for ip_record in data:
start_ip_block = int(ip_record[0])
end_ip_block = int(ip_record[1])
country = ip_record[2]
if DEFAULT_onlyFR and (country != "FR"):
continue
#commented out to free some memory
#state1 = ip_record[3]
#state2 = ip_record[4]
#city = ip_record[5]
#postcode = ip_record[6]
#latitude = ip_record[7]
#longitude = ip_record[8]
timezone = ip_record[9]
if country not in _IPRange_percountry:
_IPRange_percountry[country] = dict()
_IPRange_percountry[country]["IPRanges"] = []
_IPRange_percountry[country]["TZIPRanges"] = dict()
try:
_IPRange_percountry[country]["IPRanges"].append(IPRange(start_ip_block,end_ip_block))
except AddrFormatError:
print("[Warning] Wrong IP address format: {0}".format(line))
continue
if timezone not in _IPRange_percountry[country]["TZIPRanges"]:
_IPRange_percountry[country]["TZIPRanges"][timezone] = []
try:
_IPRange_percountry[country]["TZIPRanges"][timezone].append(IPRange(start_ip_block,end_ip_block))
except AddrFormatError:
print("[Warning] Wrong IP address format: {0}".format(line))
continue
data = None
for country_key in _IPRange_percountry.keys():
for city_key in _IPRange_percountry[country_key]["TZIPRanges"].keys():
print("launch compute of {0} {1}".format(country_key,city_key))
task = loop.run_in_executor(_executor,processIPRangeList,_IPRange_percountry[country_key]["TZIPRanges"][city_key],country_key,city_key)
tasksIPV4Tz.append(task)
_IPRange_percountry = None
print("TimeZones (IPv4) Started")
if DEFAULT__EnableIPV6:
print("processing TimeZones (IPV6)")
response = requests.get(DEFAULT__Ip2CountryDataSet_CSV_Addresses_IPv6_Cities)
byte_resp = gzip.decompress(response.content)
response=None
txt_resp = byte_resp.decode()
byte_resp=None
data = csv.reader(txt_resp.splitlines(), delimiter=',')
txt_resp=None
print('pre-computing: separating countries')
_IPRange_percountry = dict()
for ip_record in data:
start_ip_block = int(ip_record[0])
end_ip_block = int(ip_record[1])
country = ip_record[2]
if DEFAULT_onlyFR and (country != "FR"):
continue
#commented out to free some memory
#state1 = ip_record[3]
#state2 = ip_record[4]
#city = ip_record[5]
#postcode = ip_record[6]
#latitude = ip_record[7]
#longitude = ip_record[8]
timezone = ip_record[9]
if country not in _IPRange_percountry:
_IPRange_percountry[country] = dict()
_IPRange_percountry[country]["IPRanges"] = []
_IPRange_percountry[country]["TZIPRanges"] = dict()
try:
_IPRange_percountry[country]["IPRanges"].append(IPRange(start_ip_block,end_ip_block))
except AddrFormatError:
print("[Warning] Wrong IP address format: {0}".format(line))
continue
if timezone not in _IPRange_percountry[country]["TZIPRanges"]:
_IPRange_percountry[country]["TZIPRanges"][timezone] = []
try:
_IPRange_percountry[country]["TZIPRanges"][timezone].append(IPRange(start_ip_block,end_ip_block))
except AddrFormatError:
print("[Warning] Wrong IP address format: {0}".format(line))
continue
data = None
for country_key in _IPRange_percountry.keys():
for city_key in _IPRange_percountry[country_key]["TZIPRanges"].keys():
print("launch compute of {0} {1}".format(country_key,city_key))
task = loop.run_in_executor(_executor,processIPRangeList,_IPRange_percountry[country_key]["TZIPRanges"][city_key],country_key,city_key)
tasksIPV6Tz.append(task)
_IPRange_percountry = None
print("TimeZones (IPV6) Started")
print("Wait IPV4 parsing to complete (files)...")
results = await asyncio.gather(*tasksIPV4)
for _set,_entry_code in results:
if _entry_code not in _ipv4set:
_ipv4set[_entry_code] = IPSet_nocompact()
_ipv4set[_entry_code].update(_set)
_ipv4set[_entry_code].ForceCompact()
print("Done")
if DEFAULT__EnableIPV6:
print("Wait IPV6 parsing to complete (files)...")
results = await asyncio.gather(*tasksIPV6)
for _set,_entry_code in results:
if _entry_code not in _ipv6set:
_ipv6set[_entry_code] = IPSet_nocompact()
_ipv6set[_entry_code].update(_set)
_ipv6set[_entry_code].ForceCompact()
print("Done")
if DEFAULT__EnableCSVCitiesDb:
print("Wait IPV4 parsing to complete (Cities/Tz)...")
results = await asyncio.gather(*tasksIPV4Tz)
for _set,_country_key,_city_key in results:
print("received result: {0} {1}".format(_country_key,_city_key))
if _country_key not in _ipv4set:
_ipv4set[_country_key] = IPSet_nocompact()
_ipv4set[_country_key].update(_set)
if _country_key not in _ipv4setTz:
_ipv4setTz[_country_key] = dict()
if _city_key not in _ipv4setTz[_country_key]:
_ipv4setTz[_country_key][_city_key] = IPSet_nocompact()
_ipv4setTz[_country_key][_city_key].update(_set)
_ipv4set[_country_key].ForceCompact()
_ipv4setTz[_country_key][_city_key].ForceCompact()
print("Done")
if DEFAULT__EnableIPV6:
print("Wait IPV6 parsing to complete (Cities/Tz)...")
results = await asyncio.gather(*tasksIPV6Tz)
for _set,_country_key,_city_key in results:
print("received result: {0} {1}".format(_country_key,_city_key))
if _country_key not in _ipv6set:
_ipv6set[_country_key] = IPSet_nocompact()
_ipv6set[_country_key].update(_set)
if _country_key not in _ipv6setTz:
_ipv6setTz[_country_key] = dict()
if _city_key not in _ipv6setTz[_country_key]:
_ipv6setTz[_country_key][_city_key] = IPSet_nocompact()
_ipv6setTz[_country_key][_city_key].update(_set)
_ipv6set[_country_key].ForceCompact()
_ipv6setTz[_country_key][_city_key].ForceCompact()
print("Done")
print("Updating live IPV4 storage")
await self.DataSetlock.acquire()
try:
self.ipv4set = _ipv4set
self.ipv4set = _ipv4set
self.ipv4setTz = _ipv4setTz
finally:
self.DataSetlock.release()
print("Done")
if DEFAULT__EnableIPV6:
print("Wait IPV6 parsing to complete...")
results = await asyncio.gather(*tasksIPV6)
_ipv6set=dict()
for _entry_code,_set in results:
_ipv6set[_entry_code] = _set
print("Done")
print("Updating live IPV6 storage")
await self.DataSetlock.acquire()
try:
self.ipv6set = _ipv6set
self.ipv6set = _ipv6set
self.ipv6setTz = _ipv6setTz
finally:
self.DataSetlock.release()
print("Done")
end = time.time()
gc.collect()
print("Elapsed time: {0} seconds".format(end - start))
print("== DATASET UPDATE FINISHED ==")
@@ -224,7 +486,7 @@ class ChaChaIPToCountryDaemon:
if "IpDataSet_GitRepo_Address" in kwargs:
DEFAULT__IpDataSet_GitRepo_Address = kwargs["IpDataSet_GitRepo_Address"]
GitRepo_user = None
if "IpDataSet_GitRepo_user" in kwargs:
GitRepo_user = kwargs["IpDataSet_GitRepo_user"]
@@ -257,12 +519,14 @@ class ChaChaIPToCountryDaemon_Handler(tornado.web.RequestHandler):
async def getIP2Country_Full(self,ip):
print("getting ip info : {0}".format(ip))
result = dict()
result["alpha_2"] = await self.getIP2Country(ip)
city,_timezone = await self.getIP2Country(ip)
print("debug: {0} {1}".format(city,_timezone))
result["alpha_2"] = city
result["dns"]="unknown.foo"
result["alpha_3"]="ZZZ"
result["coutry_name"]="Unknown"
result["coutry_official_name"]="Unknown"
result["utc_time"]= datetime.now(timezone("UTC")).strftime('%H:%M')
result["utc_time"]= datetime.now(timezone(_timezone)).strftime('%H:%M')
result["ip"]=ip
print(result["alpha_2"])
if result["alpha_2"] != "ZZ":

View File

@@ -17,4 +17,5 @@ class Test_ChaChaIPToCountryDaemon_base(unittest.TestCase):
print("======================")
def test_simplerun(self):
tmp = ChaChaIPToCountryDaemon(EnableIPV6=False,Num_IpDataSet_Workers=12,IpDataSet_UpdatePeriod_Second=600)
print("starttest")
tmp = ChaChaIPToCountryDaemon(EnableIPV6=True,Num_IpDataSet_Workers=12,IpDataSet_UpdatePeriod_Second=600)