Python 實現批量查詢IP並解析為歸屬地

一、背景:
最近工作中做了一個小功能,目的是為了分析註冊用戶區域分佈和訂單的區域分佈情況。所以需要將其對應的IP信息解析為歸屬地,並同步每天同步更新。
線上跑起來效率還是有優化的空間,優化的方向:在調用IP查詢API過程可以調整為多線程并行解析IP。後續會更新這方便的調整。
技術: Pyhton3
postgreSQL
env配置文件

附加信息:iP地址查詢(iP138官方企業版):https://market.aliyun.com/products/56928004/cmapi015606.html#sku=yuncode960600002
.可提供免費的IP查詢API.

二、實現思路: 1、 讀取數據庫IP信息
2、 調用第三方IP解析API進行解析
3、 將解析歸屬地信息存入數據庫
三、幾點說明: 1、環境信息等參數配置
2、日誌輸出
3、異常處理: 數據庫連接異常
請求連接查詢IP的URL異常:HTTP ERROR 503
4、json,字典,數組等類型數據輸入輸出
5、分頁查詢並批量解析
5.功能實現很簡單,所以就沒有做詳細的介紹了。詳情可直接看完整代碼,有詳細的備註。

四、步驟簡單介紹:
針對實現思路的3個步驟寫了3個函數,彼此調用執行。
函數:
def get_ip_info(table_name):
def get_ip_area(table_name):
def ip_write_db(table_name):
調用:
ip_write_db("h_user_stat")

五、關鍵代碼說明:
語法:urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
 # 對從數據庫表中出查詢的IP進行解析
       querys = 'callback&datatype=jsonp&ip=' + get_ip
       bodys = {}
       url = host + path + '?' + querys
       request = urllib.request.Request(url)
       request.add_header('Authorization', 'APPCODE ' + appcode)

       # 連接url時可能會出現 ERROR: HTTP Error 503: Service Unavailable
       try: 
         response = urllib.request.urlopen(request)
       except Exception as e:
         logging.error(e) # 輸出異常日誌信息 
         time.sleep(5)
         response = urllib.request.urlopen(request)
       finally:
         content = response.read()
         ip_area = content.decode('utf8')
         ip_area = json.loads(ip_area)['data'] # json類型轉字典類型並取'data'健值
         arr.append([get_ip, ip_area]) # 將結果集存於二元數組
說明:從數據庫分頁查詢固定數量的IP存入數組,並遍歷該數組並將解析后的地區信息data健值存於二元數組中。

 

 
六、Python代碼實現如下:

  1 # 導入psycopg2包
  2 import psycopg2, time,datetime,sys
  3 import json
  4 import urllib, urllib.request
  5 import os
  6 import configparser
  7 import logging
  8                      # purpose: 連接數據庫讀取表IP信息
  9 def get_ip_info(table_name):
 10     # 全局變量作用局部作用域
 11     global pagesize # 每頁查詢數據條數
 12     global rows_count
 13 
 14     # 測試1
 15     starttime_1 = time.time()
 16     # 建立游標,用來執行數據庫操作
 17     cur = conn.cursor()
 18     # 執行SQL命令
 19     cur.execute("SELECT remote_ip FROM (select remote_ip,min(created_at) from " + table_name + " group by remote_ip) h1 where remote_ip is not null and remote_ip <> '' and  not exists (select 1 from d_ip_area_mapping h2 where h1.remote_ip = h2.remote_ip) limit " + str(pagesize) + ";")
 20 
 21 
 22     # 獲取結果集條數
 23     rows_count = cur.rowcount
 24    
 25     # print('解析用戶IP的總數:' + str(rows_count))
 26 
 27      # 當有未解析的用戶的IP,返回元組,否則退出程序
 28     if rows_count > 0:
 29       # 獲取SELECT返回的元組
 30       rows =  cur.fetchall()        # all rows in table
 31 
 32       for row in rows:
 33           tuple = rows
 34 
 35       conn.commit()
 36       # 關閉游標
 37       cur.close()
 38 
 39     else:
 40         tuple = []
 41     logging.info('每頁查詢秒數:' + str(time.time() - starttime_1))
 42     return tuple
 43     # 調用解析函數
 44 
 45 
 46 def get_ip_area(table_name):
 47   # 內包含用戶ID和IP的數組的元組
 48   tuple = get_ip_info(table_name)  
 49 
 50   # 測試2
 51   starttime_2 = time.time()
 52   host = 'http://ali.ip138.com'
 53   path = '/ip/'
 54   method = 'GET'
 55   appcode = '917058e6d7c84104b7cab9819de54b6e'
 56   arr = []
 57   for row in tuple:
 58 
 59        get_ip = row[0]
 60        #get_user = "".join(str(row))
 61        #get_user = row[0]
 62 
 63             # 對從數據庫表中出查詢的IP進行解析
 64        querys = 'callback&datatype=jsonp&ip=' + get_ip
 65        bodys = {}
 66        url = host + path + '?' + querys
 67        request = urllib.request.Request(url)
 68        request.add_header('Authorization', 'APPCODE ' + appcode)
 69 
 70        # 連接url時可能會出現 ERROR: HTTP Error 503: Service Unavailable
 71        try: 
 72          response = urllib.request.urlopen(request)
 73        except Exception as e:
 74          logging.error(e) # 輸出異常日誌信息 
 75          time.sleep(5)
 76          response = urllib.request.urlopen(request)
 77        finally:
 78          content = response.read()
 79          ip_area = content.decode('utf8')
 80          ip_area = json.loads(ip_area)['data'] # json類型轉字典類型並取'data'健值
 81          arr.append([get_ip, ip_area]) # 將結果集存於二元數組
 82   logging.info('每頁解析秒數:' + str(time.time() - starttime_2))
 83   return  arr
 84 
 85 
 86 def ip_write_db(table_name):
 87    
 88     write_ip = get_ip_area(table_name)  # 內包含用戶ID和IP的數組的元組
 89 
 90 
 91     # 測試1
 92     starttime_3 = time.time()
 93 
 94      # 建立游標,用來執行數據庫操作
 95     cur = conn.cursor()
 96     for row in write_ip:
 97         # get_user = row[0]  # 獲取用戶ID
 98         get_ip = row[0]  # 獲取用戶對應的IP
 99         country = row[1][0]  # 獲取IP解析后的地區:國家
100         province = row[1][1]  # 獲取IP解析后的地區:省
101         city = row[1][2]  # 獲取IP解析后的地區:市
102         isp = row[1][3]  # 獲取IP解析后的服務提供商
103 
104         # 執行SQL命令
105         sql = "insert into d_ip_area_mapping(remote_ip,country,province,city,isp,created_at,updated_at,job_id) values (%s,%s,%s,%s,%s,%s,%s,%s);"
106         val = [get_ip, country, province, city, isp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
107                time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),time.strftime("%Y-%m-%d",time.localtime())]
108         
109         cur.execute(sql, val)
110         conn.commit()    
111     # 關閉游標
112     cur.close()
113     logging.info('每頁插入秒數:' + str(time.time() - starttime_3))
114 
115 
116 # 1.程序開始執行計時
117 starttime = time.time()
118 
119 
120      # 讀取配置文件環境信息 
121 
122 # 項目路徑
123 rootDir = os.path.split(os.path.realpath(__file__))[0]
124 
125 
126 ############################### config.env文件路徑  #############################################################
127 
128 configFilePath = os.path.join(rootDir, 'db_udw.env')
129 config = configparser.ConfigParser()
130 config.read(configFilePath)
131 
132 # 讀取數據庫環境信息
133 db_database = config.get('postgresql','database')
134 db_user = config.get('postgresql','user')
135 db_password = config.get('postgresql','password')
136 db_host = config.get('postgresql','host')
137 db_port = config.get('postgresql','port')
138 
139 # 讀取輸出日誌路徑
140 log = config.get('log','log_path')
141 
142 # 每頁查詢數據條數
143 pagesize = config.get('page','pagesize') 
144 
145 # 讀取解析IP條數限制
146 ip_num_limit = config.get('ip_num','ip_num_limit') 
147 
148 # 配置輸出日誌格式
149 logging.basicConfig(level=logging.DEBUG,#控制台打印的日誌級別
150                       filename='{my_log_path}/ip_analyzer.log'.format(my_log_path=log),  # 指定日誌文件及路徑
151                       filemode='a',##模式,有w和a,w就是寫模式,每次都會重新寫日誌,覆蓋之前的日誌 #a是追加模式,默認如果不寫的話,就是追加模式
152                       format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'#日誌格式
153                       )
154 
155 ###############################   程序開始執行  #############################################################
156 try:
157 
158   # 連接到一個給定的數據庫
159   conn = psycopg2.connect(database=db_database, user=db_user, password=db_password, host=db_host, port=db_port)
160 except Exception as e:
161   logging.error(e) # 輸出連接異常日誌信息
162 
163 # 返回查詢行數 默認為0
164 rows_count = 0
165  # 用戶表IP解析總數
166 user_ip_num = 0 
167  # 訂單表IP解析總數
168 order_ip_num = 0 
169 
170 
171 
172 try:
173 
174   # 解析用戶表註冊IP信息
175   while user_ip_num <= eval(ip_num_limit):
176      i = 1  # 循環次數
177      ip_write_db("h_user_stat")
178      user_ip_num = user_ip_num + rows_count*i
179      i  = i + 1
180      if rows_count == 0 :
181          break
182 
183   # 解析訂單表下單IP信息 
184   while user_ip_num <= eval(ip_num_limit):
185       # 解析用戶表註冊IP信息
186       i = 1  # 循環次數
187       ip_write_db("h_order")
188       order_ip_num = order_ip_num + rows_count*i
189       i = i + 1
190       if rows_count == 0 :
191          break
192 except Exception as e:
193   logging.error(e) # 輸出異常日誌信息
194 finally:
195   # 關閉數據庫連接
196   conn.close()
197 
198 # 2 程序結束執行計時
199   endtime = time.time()
200   
201   # print('解析用戶IP的總數:' + str(user_ip_num))
202   # print('解析訂單IP的總數:' + str(order_ip_num))
203   # # 打印程序執行總耗時
204   # print('解析總耗時秒數:' + str(endtime - starttime))
205   logging.info('解析用戶IP的總數:' + str(user_ip_num))
206   logging.info('解析訂單IP的總數:' + str(order_ip_num))
207   logging.info('解析總耗時秒數:' + str(endtime - starttime))


 環境配置db_udw.envdb_udw.env 如下:

# 數據庫環境信息
[postgresql]
database = ahaschool_udw
user = admin
password = 123456
host = 127.0.0.0
port = 5432

# 設置日誌文件路徑
[log]
log_path = /home/hjmrunning/bi_etl_product/scripts/log

# 每頁查詢數據條數
[page]
pagesize = 1000  

# IP解析條數限制
[ip_num]
ip_num_limit = 150000

最後

    我接觸Python時間也不是很久,實現方法可能會有疏漏。如果有什麼疑問和見解,歡迎評論區交流。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

点赞

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *