newmediamonitoring/polls/utils.py

179 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from requests.auth import HTTPBasicAuth
import random
from parsel import Selector
from channels.db import database_sync_to_async
from django.conf import settings
from itertools import chain
from yunpian_python_sdk.model import constant as YC
from yunpian_python_sdk.ypclient import YunpianClient
from aliyunsdkdyvmsapi.request.v20170525 import SingleCallByTtsRequest
from aliyunsdkcore.client import AcsClient
import uuid
import json
from aliyunsdkcore.profile import region_provider
def send_tnps(phones, content):
url = 'https://api.tpns.tencent.com/v3/push/app'
r = requests.post(url,
auth=HTTPBasicAuth(
'1500015491', 'd5c793535529b6dc55d00b22302bad64'),
json={
"audience_type": "account_list",
"account_list": phones,
"message": {
"title": content,
"content": ''
},
"message_type": "notify",
"account_push_type": 1
})
return r
def sent_sms_code(phone, code):
clnt = YunpianClient('304eb08353f7ebf00596737acfc31f53')
param = {YC.MOBILE: phone,
YC.TEXT: '【甘肃大未来科技】您的验证码是%s。如非本人操作,请忽略本短信' % (code,)}
r = clnt.sms().single_send(param)
return r
def sent_sms_notify(phone, type):
clnt = YunpianClient('304eb08353f7ebf00596737acfc31f53')
if type == 1:
param = {YC.MOBILE: phone,
YC.TEXT: '【甘肃大未来科技】政务新媒体管理APP有新消息请及时登录处理。若其他管理员已处理请忽略*测试*'}
else:
param = {YC.MOBILE: phone, YC.TEXT: '【甘肃大未来科技】政务新媒体管理APP有新任务请及时登录处理。'}
r = clnt.sms().single_send(param)
return r
acs_client = AcsClient("LTAI4GDaSp41vp1X4mCb7uCa",
"am8fsIou5TZULV7mEAIEUwYfxN3cHK", "cn-hangzhou")
region_provider.add_endpoint(
"Dyvmsapi", "cn-hangzhou", "dyvmsapi.aliyuncs.com")
def send_voice_notify(phone, type):
business_id = uuid.uuid4()
ttsRequest = SingleCallByTtsRequest.SingleCallByTtsRequest()
# 申请的语音通知tts模板编码,必填
template_code = "TTS_205885979" if type == 1 else "TTS_205890042"
ttsRequest.set_TtsCode(template_code)
# 设置业务请求流水号,必填。后端服务基于此标识区分是否重复请求的判断
ttsRequest.set_OutId(business_id)
# 语音通知的被叫号码,必填。
ttsRequest.set_CalledNumber(phone)
# 语音通知显示号码,必填。
ttsRequest.set_CalledShowNumber("")
# 调用tts文本呼叫接口返回json
ttsResponse = acs_client.do_action_with_exception(ttsRequest)
return ttsResponse
def generate_code():
return random.randint(1000, 9999)
def detect_type(url):
if 'mp.weixin.qq.com' in url:
return 'weixin'
elif 'toutiao.com' in url:
return 'toutiao'
elif 'ixigua.com' in url:
return 'xigua'
elif 'gansudaily.com.cn' in url:
return 'xgs'
else:
return 'other'
def parse(url):
t = detect_type(url)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/3.53.1159.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 MicroMessenger/6.5.2.501 NetType/WIFI WindowsWechat",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
}
r = requests.get(url, headers=headers)
print(r.encoding)
if t == 'xgs' and r.encoding=='ISO-8859-1':
r.encoding='GBK'
else:
r.encoding = 'utf-8'
# way2@8797799
selector = Selector(text=r.text)
if t == 'weixin':
og_title = selector.xpath(
'//head/meta[@property="og:title"]/@content').get()
og_description = selector.xpath(
'//head/meta[@property="og:description"]/@content').get()
og_url = selector.xpath(
'//head/meta[@property="og:url"]/@content').get()
og_image = selector.xpath(
'//head/meta[@property="og:image"]/@content').get()
elif t == 'toutiao':
og_title = selector.xpath('//head/title/text()').get()
og_description = selector.xpath(
'//head/meta[@name="description"]/@content').get()
og_url = url
og_image = None
elif t == 'xgs':
og_title = selector.xpath('//head/title/text()').get()
og_description = selector.xpath(
'//head/meta[@name="description"]/@content').get()
og_url = url
og_image = None
else:
og_title = selector.xpath('//head/title/text()').get()
og_description = selector.xpath(
'//head/meta[@name="description"]/@content').get()
og_url = url
og_image = None
return (og_title, og_description, og_url, og_image)
def model_to_dict(instance, fields):
opts = instance._meta
data = {}
for f in chain(opts.concrete_fields, opts.private_fields, opts.many_to_many):
if not getattr(f, 'editable', False):
continue
if fields and f.name not in fields:
continue
if f.name == 'id':
data[f.name] = str(f.value_from_object(instance))
else:
data[f.name] = f.value_from_object(instance)
return data
def queryset_to_list(q, fields):
l = []
for row in q:
r = model_to_dict(row, fields)
l.append(r)
return l
if __name__ == '__main__':
# r = sent_sms_code('13993199566', 4321)
# print(r.code(), type(r.code()))
# og_title, og_description, og_url, og_image = parse(
# 'https://mp.weixin.qq.com/s/EhX0Pm1e0FAfse0zz9ow8Q')
# og_title, og_description, og_url, og_image = parse(
# 'https://m.toutiao.com/i6883651337003729420/?tt_from=weixin&utm_campaign=client_share&app=news_article&utm_source=weixin&iid=1494959660475024&utm_medium=toutiao_android&wxshare_count=1')
# og_title, og_description, og_url, og_image = parse(
# 'http://www.gov.cn/xinwen/2020-10/13/content_5550906.htm')
# og_title, og_description, og_url, og_image = parse(
# 'http://gansu.gansudaily.com.cn/system/2021/01/25/030261998.shtml')
# print(og_title, og_description, og_url, og_image)
# print(send_voice_notify('13993199566'))
r = send_tnps(['13609346975'], 'conteeeeent')
print(r.status_code, r.text)