newmediamonitoring1/polls/utils.py

146 lines
5.3 KiB
Python

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
import requests
import random
from parsel import Selector
from channels.db import database_sync_to_async
from .exceptions import ClientError
from django.conf import settings
from polls.models import FileMessage, ImageMessage, NormalMessage, URLMessage
from itertools import chain
def sent_sms_code(phone, code):
client = AcsClient(settings.SMS_ACCESS_KEY_ID,
settings.SMS_ACCESS_KEY_SECRET, settings.SMS_REGION)
request = CommonRequest()
request.set_accept_format('json')
request.set_domain('dysmsapi.aliyuncs.com')
request.set_method('POST')
request.set_protocol_type('https') # https | http
request.set_version('2017-05-25')
request.set_action_name('SendSms')
request.add_query_param('RegionId', "cn-hangzhou")
request.add_query_param('PhoneNumbers', phone)
request.add_query_param('SignName', "短信验证")
request.add_query_param('TemplateCode', "SMS_12330409")
request.add_query_param('TemplateParam', '{"number":"%s"}' % (code,))
response = client.do_action(request)
return response
def generate_code():
return random.randint(1000, 9999)
def detect_type(url):
if 'mp.weixin.qq.com' in url:
return 'weixin'
elif 'toutiao.com' in url:
return 'toutiao'
elif 'ixigua.com' in url:
return 'xigua'
else:
return 'other'
def parse(url):
t = detect_type(url)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/3.53.1159.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 MicroMessenger/6.5.2.501 NetType/WIFI WindowsWechat",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
}
r = requests.get(url, headers=headers)
selector = Selector(text=r.text)
if t == 'weixin':
og_title = selector.xpath(
'//head/meta[@property="og:title"]/@content').get()
og_description = selector.xpath(
'//head/meta[@property="og:description"]/@content').get()
og_url = selector.xpath(
'//head/meta[@property="og:url"]/@content').get()
og_image = selector.xpath(
'//head/meta[@property="og:image"]/@content').get()
elif t == 'xigua':
og_title = selector.xpath(
'//head/meta[@property="og:title"]/@content').get()
og_description = selector.xpath(
'//head/meta[@property="og:description"]/@content').get()
og_url = selector.xpath(
'//head/meta[@property="og:url"]/@content').get()
og_image = selector.xpath(
'//head/meta[@property="og:image"]/@content').get()
elif t == 'toutiao':
og_title = selector.xpath('//head/title/text()')
og_description = selector.xpath(
'//head/meta[@name="description"]/@content').get()
og_url = url
og_image = None
else:
og_title = selector.xpath('//head/title/text()')
og_description = selector.xpath(
'//head/meta[@name="description"]/@content').get()
og_url = url
og_image = None
return (og_title, og_description, og_url, og_image)
@database_sync_to_async
def get_room_or_error(room_id, user):
"""
Tries to fetch a room for the user, checking permissions along the way.
"""
# Check if the user is logged in
if not user.is_authenticated:
raise ClientError("USER_HAS_TO_LOGIN")
@database_sync_to_async
def build_message(type, user_id, group_id, task_id, payload):
if type == 1:
URLMessage.objects.create(send_from__id=user_id, send_to__id=group_id,
task__id=task_id, title=payload.title, description=payload.description, url=payload.url, image=payload.image)
elif type == 1:
FileMessage.objects.create(send_from__id=user_id, send_to__id=group_id,
task__id=task_id, title=payload.title, file=payload.file)
elif type == 2:
ImageMessage.objects.create(send_from__id=user_id, send_to__id=group_id,
task__id=task_id, title=payload.title, image=payload.image)
else:
NormalMessage.objects.create(send_from__id=user_id, send_to__id=group_id,
category=0, task__id=task_id, content=payload.content)
def model_to_dict(instance, fields):
opts = instance._meta
data = {}
for f in chain(opts.concrete_fields, opts.private_fields, opts.many_to_many):
if not getattr(f, 'editable', False):
continue
if fields and f.name not in fields:
continue
if f.name == 'id':
data[f.name] = str(f.value_from_object(instance))
else:
data[f.name] = f.value_from_object(instance)
return data
def queryset_to_list(q, fields):
l = []
for row in q:
r = model_to_dict(row, fields)
l.append(r)
return l
if __name__ == '__main__':
# sent_sms_code('13993199566')
og_title, og_description, og_url, og_image = parse(
'https://mp.weixin.qq.com/s/EhX0Pm1e0FAfse0zz9ow8Q')
print(og_title, og_description, og_url, og_image)