from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest import requests import random from parsel import Selector from channels.db import database_sync_to_async from .exceptions import ClientError from django.conf import settings from polls.models import FileMessage, ImageMessage, NormalMessage, URLMessage from itertools import chain def sent_sms_code(phone, code): client = AcsClient(settings.SMS_ACCESS_KEY_ID, settings.SMS_ACCESS_KEY_SECRET, settings.SMS_REGION) request = CommonRequest() request.set_accept_format('json') request.set_domain('dysmsapi.aliyuncs.com') request.set_method('POST') request.set_protocol_type('https') # https | http request.set_version('2017-05-25') request.set_action_name('SendSms') request.add_query_param('RegionId', "cn-hangzhou") request.add_query_param('PhoneNumbers', phone) request.add_query_param('SignName', "短信验证") request.add_query_param('TemplateCode', "SMS_12330409") request.add_query_param('TemplateParam', '{"number":"%s"}' % (code,)) response = client.do_action(request) return response def generate_code(): return random.randint(1000, 9999) def detect_type(url): if 'mp.weixin.qq.com' in url: return 'weixin' elif 'toutiao.com' in url: return 'toutiao' elif 'ixigua.com' in url: return 'xigua' else: return 'other' def parse(url): t = detect_type(url) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36 QBCore/3.53.1159.400 QQBrowser/9.0.2524.400 Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 MicroMessenger/6.5.2.501 NetType/WIFI WindowsWechat", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", } r = requests.get(url, headers=headers) selector = Selector(text=r.text) if t == 'weixin': og_title = selector.xpath( '//head/meta[@property="og:title"]/@content').get() og_description = selector.xpath( '//head/meta[@property="og:description"]/@content').get() og_url = selector.xpath( '//head/meta[@property="og:url"]/@content').get() og_image = selector.xpath( '//head/meta[@property="og:image"]/@content').get() elif t == 'xigua': og_title = selector.xpath( '//head/meta[@property="og:title"]/@content').get() og_description = selector.xpath( '//head/meta[@property="og:description"]/@content').get() og_url = selector.xpath( '//head/meta[@property="og:url"]/@content').get() og_image = selector.xpath( '//head/meta[@property="og:image"]/@content').get() elif t == 'toutiao': og_title = selector.xpath('//head/title/text()') og_description = selector.xpath( '//head/meta[@name="description"]/@content').get() og_url = url og_image = None else: og_title = selector.xpath('//head/title/text()') og_description = selector.xpath( '//head/meta[@name="description"]/@content').get() og_url = url og_image = None return (og_title, og_description, og_url, og_image) @database_sync_to_async def get_room_or_error(room_id, user): """ Tries to fetch a room for the user, checking permissions along the way. """ # Check if the user is logged in if not user.is_authenticated: raise ClientError("USER_HAS_TO_LOGIN") @database_sync_to_async def build_message(type, user_id, group_id, task_id, payload): if type == 1: URLMessage.objects.create(send_from__id=user_id, send_to__id=group_id, task__id=task_id, title=payload.title, description=payload.description, url=payload.url, image=payload.image) elif type == 1: FileMessage.objects.create(send_from__id=user_id, send_to__id=group_id, task__id=task_id, title=payload.title, file=payload.file) elif type == 2: ImageMessage.objects.create(send_from__id=user_id, send_to__id=group_id, task__id=task_id, title=payload.title, image=payload.image) else: NormalMessage.objects.create(send_from__id=user_id, send_to__id=group_id, category=0, task__id=task_id, content=payload.content) def model_to_dict(instance, fields): opts = instance._meta data = {} for f in chain(opts.concrete_fields, opts.private_fields, opts.many_to_many): if not getattr(f, 'editable', False): continue if fields and f.name not in fields: continue if f.name == 'id': data[f.name] = str(f.value_from_object(instance)) else: data[f.name] = f.value_from_object(instance) return data def queryset_to_list(q, fields): l = [] for row in q: r = model_to_dict(row, fields) l.append(r) return l if __name__ == '__main__': # sent_sms_code('13993199566') og_title, og_description, og_url, og_image = parse( 'https://mp.weixin.qq.com/s/EhX0Pm1e0FAfse0zz9ow8Q') print(og_title, og_description, og_url, og_image)