利用多线程
加队列
(redis),批量请求
# encoding : utf-8
# __author__ = 'jm'
import logging
import threading
import datetime
import asyncio
import json
import time
import requests
from requests import exceptions
import os, sys
import django
import argparse
from django.db import connections
from utils import MyAdapter
import queue
import pymysql
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
sys.path.append(os.path.join(BASE_DIR,'src'))
sys.path.append(os.path.join(BASE_DIR,'data_model'))
info_logger = logging.getLogger("pull_info")
class MysqlClient(object):
def __init__(self):
self.host = settings.MYSQL_HOST
self.port = settings.MYSQL_PORT
self.user = settings.MYSQL_USER,
self.password = settings.MYSQL_PASSWORD
self.database = settings.MYSQL_DB
def __enter__(self):
self.db = pymysql.connect(
host = settings.MYSQL_HOST,
port = settings.MYSQL_PORT,
user = settings.MYSQL_USER,
password = settings.MYSQL_PASSWORD,
database = settings.MYSQL_DB,
)
self.cursor = self.db.cursor(pymysql.cursors.DictCursor)
return self
def select(self, sql, *params):
self.cursor.execute(sql, params)
return self.cursor.fetchall()
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.db.close()
class Worker(threading.Thread):
def __init__(self, task_queue, **kwargs):
threading.Thread.__init__(self, **kwargs)
self.task_queue = task_queue
self.local_thread_ = threading.local()
def _handle(self, account):
if self.local_thread_.retry_dict.get(account, None):
self.local_thread_.retry_dict[account]["time"] += 1
account_obj = json.loads(account)
account_ids = account_obj["accounts_ids"]
access_token =account_obj["access_token"]
request_list = []
for account_id in account_ids:
d = {
"method": "GET",
"relative_url": "v3.2/act_{}/campaigns?fields=name,objective,can_use_spend_cap,status,spend_cap,effective_status,promoted_object,daily_budget,lifetime_budget,start_time,end_time, account_id&limit=1000".format(
account_id)
}
request_list.append(d)
params = {
"access_token": access_token,
"batch": json.dumps(request_list)
}
try:
s = requests.Session()
s.mount("https://", MyAdapter())
with s.post("https://graph.facebook.com/", params=params) as response:
return_data = json.loads(response.text)
for item in return_data:
try:
base_data = json.loads(item["body"])
campaign_list = json.loads(item["body"])["data"]
try:
_ = base_data["paging"]['next']
next_campaign_list = self.get_next_page_data(base_data, [])
campaign_list.extend(next_campaign_list)
except:
pass
campaign_bulk = []
for campaign_data in campaign_list:
try:
campaign_obj = ...
campaign_list.append(campaign_obj)
except:
continue
XXXXXX.objects.bulk_create(campaign_bulk)
except Exception as e:
print(e)
pass
except exceptions.ConnectionError as E:
info_logger.info("Timeout: " + str(E))
if self.local_thread_.retry_dict.get(account, None):
retry_time = self.local_thread_.retry_dict[account]["time"]
if retry_time > 3:
return
else:
self.local_thread_.retry_dict[account] = {"access_token": access_token,
'time': 0}
time.sleep(0.5)
self._handle_account_campaign(account)
except Exception as e:
print(e)
return
def get_next_page_data(self, origin_data, return_data):
try:
next_page_url = origin_data['paging']['next']
s = requests.Session()
s.mount("https://", MyAdapter())
with s.get(next_page_url) as response:
temp_ad_list = json.loads(response.text)["data"]
return_data.extend(temp_ad_list)
return (self.get_next_page_data(json.loads(response.text), return_data))
except KeyError:
return return_data
def run(self):
while True:
try:
self.local_thread_.retry_dict = {}
account = self.task_queue.get(False)
self._handle_account_campaign(account)
except queue.Empty:
info_logger.info("campaign_queue is empty now!")
break
except Exception as e:
info_logger.info(e)
time.sleep(0.5)
class Service(object):
def __init__(self):
self.accounts_list = self._get_accounts_list()
self.task_queue = queue.Queue()
def _get_accounts_list(self):
sql = "xxxxxxx"
with MysqlClient() as mysql_handler:
access_token_list = [item["access_token"] for item in mysql_handler.select(sql)]
accounts_list = list(map(self._get_account_ids, access_token_list))
return accounts_list
def _get_account_ids(self, access_token):
sql = "xxx"
param = access_token
with MysqlClient() as mysql_handler:
account_ids = [item["account_id"] for item in mysql_handler.select(sql, param)]
return {"access_token": access_token, "accounts_ids": account_ids}
def _convert_accounts_info(self):
accounts_50_list = []
for grouped_accounts in self.accounts_list:
accounts_ids = grouped_accounts["accounts_ids"]
access_token = grouped_accounts["access_token"]
temp_id_list = [accounts_ids[i: i + 50] for i in range(0, len(accounts_ids), 50)]
for temp_50 in temp_id_list:
accounts_50_list.append(json.dumps({"access_token": access_token, "accounts_ids": temp_50}))
return accounts_50_list
def run(self):
ad_accounts = self._convert_accounts_info()
for account in ad_accounts:
self.task_queue.put(account)
account_working_list = []
for _ in range(20):
account_working_thread = Worker(self.task_queue)
account_working_list.append(account_working_thread)
for account_working_thread in account_working_list:
account_working_thread.daemon = True
account_working_thread.start()
for account_working_thread in account_working_list:
account_working_thread.join()
def pull_account_campaign():
django.db.connections.close_all()
Service().run()
if __name__ == '__main__':
ap = argparse.ArgumentParser()
ap.add_argument('-d', '--execute_dir', type=str,
help='执行目录',
default=BASE_DIR)
args = ap.parse_args()
sys.path.append(os.path.join(args.execute_dir, "conf"))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_settings")
django.setup()
from data_model.models import xxxx, xxxxx, xxxxx, xxxxx, xxxx
from django.conf import settings
info_logger = logging.getLogger("pull_info")
pull_account_campaign()