Quota exceeded for quota metric 'Get requests' and limit 'Get requests per minute' of service 'googleads.googleapis.com' for consumer 'project_number:'
# coding: utf-8
from __future__ import absolute_import
import os
import sys
from functools import partial
import argparse
import datetime
import time
import logging
import signal
import threading
#from gevent.pool import Pool
#from gevent.pool import Group
#from gevent import Greenlet
#import gevent
# gevent monkey patch -- grpc 不能用
# async 也不行
# 使用多綫程
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import google.ads.google_ads.client
import google.oauth2.credentials
# logging.getLogger('google.ads.google_ads.client').setLevel(logging.INFO)
def wrapper(retry_time, func):
@wraps(func)
def inner(*args, **kw):
nonlocal retry_time
while retry_time > 0:
try:
ret = func(*args, **kw)
return ret
except ResourceExhausted as e:
print(f" 請求資源過多異常 with {e.msg}")
time.sleep(10)
retry_time -= 1
except Exception as e :
print(f"retried with 其他异常: {str(e)}")
retry_time -= 1
return func(*args, **kw)
return inner
def retry(retry_time):
return partial(wrapper, retry_time)
TIME_FORMAT = "%Y-%m-%d"
class GoogleThreadHandler():
def __init__(self, user_credentials):
self.credentials = user_credentials["credentials"]
self.user_name = user_credentials["user_name"]
self.customer_under_user = []
self.customer_under_mcc = []
self.error_customer_under_mcc = []
self.account_cost_dict = dict()
self.client = self._init()
def _run(self):
resource_names = self.get_customer_names()
with ThreadPoolExecutor(max_workers=50) as executor:
to_do = []
for resource_name in resource_names:
future = executor.submit(self.get_mcc_and_account_under_user, resource_name)
to_do.append(future)
mcc_under_user_list = []
for future in as_completed(to_do):
mcc = future.result()
if mcc is not None:
mcc_under_user_list.append(mcc)
# 1 获取user下面的 mcc 并把 user下的account放入 customer_under_user列表中
with ThreadPoolExecutor(max_workers=50) as executor:
executor.map(self.get_customer_under_mcc, mcc_under_user_list)
# 2 依次把各个层级下的mcc 中的 普通 customer account 放入 customer_under_mcc 列表
# 3 获取 account 的 消耗数据
time.sleep(0.05)
with ThreadPoolExecutor(max_workers=50) as executor:
executor.map(
self.get_customer_cost_last_day, self.customer_under_mcc
)
time.sleep(0.05)
with ThreadPoolExecutor(max_workers=50) as executor:
executor.map(
self.get_customer_cost_last_day, self.error_customer_under_mcc
)
return self.account_cost_dict
# 4 数据filter 后汇总, 发送邮件
def _init(self, parent_mcc=None):
credentials = google.oauth2.credentials.Credentials(**self.credentials)
if parent_mcc is None:
client = google.ads.google_ads.client.GoogleAdsClient(credentials, settings.GG_DEVELOP_TOKEN)
else:
client = google.ads.google_ads.client.GoogleAdsClient(credentials, settings.GG_DEVELOP_TOKEN, login_customer_id=parent_mcc)
return client
def get_customer_names(self):
customer_service = self.client.get_service('CustomerService', version='v2')
customers = customer_service.list_accessible_customers()
resource_names = customers.resource_names
return resource_names
def get_mcc_and_account_under_user(self, resource_name):
ga_service = self.client.get_service('GoogleAdsService', version='v2')
customer_id = resource_name.split("/")[-1]
query = """
SELECT
customer.manager,
customer.test_account,
customer.time_zone,
customer.currency_code,
customer.descriptive_name,
customer.id
FROM
customer
"""
try:
customer_results = ga_service.search(customer_id, query=query)
for c in customer_results:
if c.customer.test_account.value:
return None
if not c.customer.manager.value:
self.customer_under_user.append(str(c.customer.id.value))
if c.customer.manager.value:
return str(c.customer.id.value)
except:
return None
def get_customer_cost_last_day(self, ga_service_customer_id):
customer_id, ga_service, parent_mcc = ga_service_customer_id
query_cost = """
SELECT
customer.descriptive_name,
metrics.cost_micros
FROM
customer
WHERE
segments.date BETWEEN '{}' AND '{}'
LIMIT
100
""".format(YESTERDAY, YESTERDAY)
query_payments = """
SELECT
billing_setup.payments_account,
billing_setup.start_date_time,
billing_setup.payments_account_info.payments_account_name,
billing_setup.payments_account_info.payments_profile_name
FROM
billing_setup
WHERE
billing_setup.status IN ('APPROVED', 'APPROVED_HELD')
ORDER BY
billing_setup.start_date_time DESC
"""
try:
results_payments = ga_service.search(customer_id, query=query_payments)
for p in results_payments:
payments_profile_name = p.billing_setup.payments_account_info.payments_profile_name.value
bill_start_time = p.billing_setup.start_date_time.value.split()[0]
# 按时间逆序, 最新的bill是在使用的
if compare_time(YESTERDAY, bill_start_time):
# 该天的消耗, 产生在这条bill后
if payments_profile_name != "BlueVision Interactive Limited":
logging.info(f"不算消耗, {payments_profile_name},cost_date: {YESTERDAY}, bill_start_time: {bill_start_time}, customer_id: {customer_id}, MCC: {parent_mcc}")
return
else:
logging.info(f"计算消耗, {payments_profile_name},cost_date: {YESTERDAY}, bill_start_time: {bill_start_time}, customer_id: {customer_id}, MCC: {parent_mcc}")
results = ga_service.search(customer_id, query=query_cost)
account_name = ""
cost = 0
account_id = google_format(customer_id)
for r in results:
account_name = r.customer.descriptive_name.value
try:
cost = r.metrics.cost_micros.value
except:
cost = ""
else:
if cost > 0:
cost = format_cost_float(micros_to_currency(cost))
tuple_data = (
account_id,
account_name,
cost
)
self.account_cost_dict[account_id] = tuple_data
return
else:
continue
except Exception as e:
self.error_customer_under_mcc.append((customer_id, ga_service, parent_mcc))
logging.error(f"Error with, {YESTERDAY}, {customer_id}, {parent_mcc}, reason: {e}")
...
def get_customer_under_mcc(self, mcc_id):
if mcc_id:
query = """
SELECT
customer_client.id,
customer_client.descriptive_name,
customer_client.currency_code,
customer_client.time_zone,
customer_client.manager,
customer_client.level
FROM
customer_client
"""
client = self._init(mcc_id)
ga_service = client.get_service('GoogleAdsService', version='v2')
# 嵌套 共用一个client , 防止每次递归都重新创建
def inner(parent_mcc, mcc_id):
mcc_list = []
try:
results = ga_service.search(customer_id=mcc_id, query=query, retry=False)
for item in results:
customer_id = str(item.customer_client.id.value)
is_manager = item.customer_client.manager.value
level = item.customer_client.level.value
if is_manager == False:
self.customer_under_mcc.append((customer_id, ga_service, parent_mcc))
if level !=0 and is_manager == True:
mcc_list.append(customer_id)
except Exception as e:
logging.info(f"获取customer失败, 原因: {e}")
time.sleep(0.05)
with ThreadPoolExecutor(max_workers=50) as e:
e.map(
partial(inner, parent_mcc),
mcc_list
)
inner(mcc_id, mcc_id)
else:
...
#class GoogleGeventHandler(Greenlet):
#
# def __init__(self, user_credentials, pool, group):
# Greenlet.__init__(self)
# self.credentials = user_credentials["credentials"]
# self.user_name = user_credentials["user_name"]
# self.group = group
# self.pool = pool
# self.customer_under_user = []
# self.customer_under_mcc = []
# self.account_cost_set = set()
# self.client = self._init()
#
#
# def _run(self):
# resource_names = self.get_customer_names()
# # igroup = Group()
# mcc_under_user_list = self.group.imap_unordered(self.get_mcc_and_account_under_user, resource_names)
# # 1 获取user下面的 mcc 并把 user下的account放入 customer_under_user列表中
#
# self.pool.map(self.get_customer_under_mcc, mcc_under_user_list)
# # 2 依次把各个层级下的mcc 中的 普通 customer account 放入 customer_under_mcc 列表
#
# # 3 获取 account 的 消耗数据
# # self.pool_for_customer.map(
# # self.get_customer_cost_last_day, self.customer_under_mcc
# # )
# # self.group.map(
# # self.get_customer_cost_last_day, self.customer_under_mcc
# # )
#
# # 4 数据filter 后汇总, 发送邮件
#
#
# def _init(self, parent_mcc=None):
# credentials = google.oauth2.credentials.Credentials(**self.credentials)
# if parent_mcc is None:
# client = google.ads.google_ads.client.GoogleAdsClient(credentials, settings.GG_DEVELOP_TOKEN)
# else:
# client = google.ads.google_ads.client.GoogleAdsClient(credentials, settings.GG_DEVELOP_TOKEN, login_customer_id=parent_mcc)
# return client
#
# def get_customer_names(self):
# customer_service = self.client.get_service('CustomerService', version='v2')
# customers = customer_service.list_accessible_customers()
# resource_names = customers.resource_names
# return resource_names
#
#
# def get_mcc_and_account_under_user(self, resource_name):
# ga_service = self.client.get_service('GoogleAdsService', version='v2')
# customer_id = resource_name.split("/")[-1]
# query = """
# SELECT
# customer.manager,
# customer.test_account,
# customer.time_zone,
# customer.currency_code,
# customer.descriptive_name,
# customer.id
# FROM
# customer
# """
#
# try:
# customer_results = ga_service.search(customer_id, query=query)
# for c in customer_results:
#
# if c.customer.test_account.value:
# return None
#
# if not c.customer.manager.value:
# self.customer_under_user.append(str(c.customer.id.value))
#
# if c.customer.manager.value:
# return str(c.customer.id.value)
# except:
# return None
#
# def get_customer_cost_last_day(self, ga_service_customer_id):
# customer_id, ga_service = ga_service_customer_id
# query = """
# SELECT
# customer.descriptive_name,
# metrics.cost_micros
# FROM
# customer
# WHERE
# segments.date BETWEEN '{}' AND '{}'
# LIMIT
# 100
#
# """.format(YESTERDAY, YESTERDAY)
#
# try:
# results = ga_service.search(customer_id, query=query)
# # data = {
# # "account_id": google_format(customer_id),
# # "account_name":"",
# # "cost": 0,
# # "dt": YESTERDAY,
# # }
# #
# account_name = ""
# cost = 0
# account_id = google_format(customer_id)
# for r in results:
# account_name = r.customer.descriptive_name.value
# try:
# cost = r.metrics.cost_micros.value
# except:
# cost = ""
# else:
# if cost > 0:
# cost = micros_to_currency(cost)
# tuple_data = (
# account_id,
# account_name,
# cost
# )
# self.account_cost_list.append(tuple_data)
# # logging.info(data)
# # logging.info(len(self.account_cost_list))
# except:
# ...
#
# def get_customer_under_mcc(self, mcc_id):
# if mcc_id:
# query = """
# SELECT
# customer_client.id,
# customer_client.descriptive_name,
# customer_client.currency_code,
# customer_client.time_zone,
# customer_client.manager,
# customer_client.level
# FROM
# customer_client
# """
#
# client = self._init(mcc_id)
# ga_service = client.get_service('GoogleAdsService', version='v2')
#
# # 嵌套 共用一个client , 防止每次递归都重新创建
#
# def inner(parent_mcc, mcc_id):
# # logging.info(len(self.customer_under_mcc))
# mcc_list = []
# try:
# results = ga_service.search(customer_id=mcc_id, query=query, retry=False)
# for item in results:
# customer_id = str(item.customer_client.id.value)
# is_manager = item.customer_client.manager.value
# level = item.customer_client.level.value
#
# if is_manager == False:
# self.customer_under_mcc.append((customer_id, ga_service))
# # self.pool()
# self.pool.spawn(self.get_customer_cost_last_day ,(customer_id, ga_service))
# # logging.info(g.value)
#
# if level !=0 and is_manager == True:
# mcc_list.append(customer_id)
#
# except Exception as e:
# logging.info(e)
#
# # self.pool.map(
# # partial(self.get_customer_cost_last_day, ga_service),
# # customer_ids
# # )
#
# # self.pool_for_mcc.map(
# # partial(inner, parent_mcc),
# # mcc_list
# # )
# self.pool.map(
# partial(inner, parent_mcc),
# mcc_list
# )
#
# inner(mcc_id, mcc_id)
# else:
# ...
def micros_to_currency(micros):
return micros / 1000000.0
def currency_to_micros(currency_money):
return currency_money * 1000000
def google_format(n):
# 1234567890 -> 123-456-7890
n = str(n)
return format(int(n[:-1]), ",").replace(",", "-") + n[-1]
def format_cost_float(cost):
# cost 6位小数
return round(cost + 0.00000001, 2)
# @todo 17.955 -> 计算机的存储中是17.954999999999...
def compare_time(time_compare, time_to_compare):
TIME_FORMAT = "%Y-%m-%d"
# 判断 bill时间 是否 早于 cost的时间
time1 = datetime.datetime.strptime(time_compare, TIME_FORMAT)
time2 = datetime.datetime.strptime(time_to_compare, TIME_FORMAT)
if time2 <= time1:
return True
return False
if __name__ == '__main__':
# signal.signal(signal.SIGTERM, gevent.kill)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
YESTERDAY = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime(TIME_FORMAT)
parser = argparse.ArgumentParser(description='google account cost')
parser.add_argument("-p", "--produce_env", type=str,
help="环境, offline vs production",
default="offline")
parser.add_argument('-d', '--execute_dir', type=str,
help='执行目录',
default=BASE_DIR)
parser.add_argument('-date', '--cost_date', type=str,
help='获取消耗的日期, YYYY-MM-DD',
default=YESTERDAY)
parser.add_argument('-task', '--task_type', type=str,
help='任务类型, email vs db',
default="email")
args = parser.parse_args()
sys.path.append(os.path.join(args.execute_dir, "conf"))
sys.path.append(args.execute_dir)
sys.path.append(BASE_DIR)
sys.path.append(os.path.join(BASE_DIR, "db"))
logConfFile = os.path.join(args.execute_dir, 'conf/logging.cfg')
logging.config.fileConfig(logConfFile)
os.environ['DJANGO_SETTINGS_MODULE'] = r'settings'
from django.conf import settings
from utils import CostExcelHandler
env = args.produce_env
if env.lower() == "offline":
os.environ['https_proxy'] = 'xxx'
else:
...
YESTERDAY = args.cost_date
start = time.perf_counter()
tasks = [
GoogleThreadHandler(user_conf) for user_conf in settings.USER_INFO_LIST
]
final_dict = dict()
for g in tasks:
account_dict = g._run()
final_dict.update(account_dict)
handler = CostExcelHandler("google_cost", final_dict, YESTERDAY)
task_type = args.task_type
if task_type.lower() == "db":
handler.save_cost_into_db()
else:
handler.send_email()
logging.info(f"google_account cost with task: {task_type}, date: {YESTERDAY},"
f"env: {env}, exe_dir: {args.execute_dir}. finished after time spent: {(time.perf_counter() - start) / 60 } mins" )
query = """
SELECT
customer.descriptive_name,
customer.id,
metrics.cost_micros,
geographic_view.country_criterion_id
FROM
geographic_view
WHERE
segments.date BETWEEN '{}' AND '{}'
AND
metrics.cost_micros > 0
""".format("2020-06-01","2020-06-17")