refactor message

This commit is contained in:
baoliang 2020-09-19 00:11:19 +08:00
parent 4dbfe71f70
commit 7847c45289
12 changed files with 110 additions and 199 deletions

View File

@ -1,5 +1,12 @@
from channels.routing import ProtocolTypeRouter
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import polls.routing
application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
polls.routing.websocket_urlpatterns
)
),
})

View File

@ -85,6 +85,16 @@ TEMPLATES = [
WSGI_APPLICATION = 'NewMediaMonitoring.wsgi.application'
ASGI_APPLICATION = 'NewMediaMonitoring.routing.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": ["redis://:newmedia2020@210.72.82.249:6379/0"],
"symmetric_encryption_keys": [SECRET_KEY],
},
},
}
# Database
# https://docs.djangoproject.com/en/2.1/ref/settings/#databases

View File

@ -1,185 +1,47 @@
from django.conf import settings
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from .exceptions import ClientError
from .utils import get_room_or_error
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncJsonWebsocketConsumer):
"""
This chat consumer handles websocket connections for chat clients.
It uses AsyncJsonWebsocketConsumer, which means all the handling functions
must be async functions, and any sync work (like ORM access) has to be
behind database_sync_to_async or sync_to_async. For more, read
http://channels.readthedocs.io/en/latest/topics/consumers.html
"""
##### WebSocket event handlers
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
"""
Called when the websocket is handshaking as part of initial connection.
"""
# Are they logged in?
if self.scope["user"].is_anonymous:
# Reject the connection
await self.close()
else:
# Accept the connection
await self.accept()
# Store which rooms the user has joined on this connection
self.rooms = set()
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'chat_%s' % self.room_name
async def receive_json(self, content):
"""
Called when we get a text frame. Channels will JSON-decode the payload
for us and pass it as the first argument.
"""
# Messages will have a "command" key we can switch on
command = content.get("command", None)
try:
if command == "join":
# Make them join the room
await self.join_room(content["room"])
elif command == "leave":
# Leave the room
await self.leave_room(content["room"])
elif command == "send":
await self.send_room(content["room"], content["message"])
except ClientError as e:
# Catch any errors and send it back
await self.send_json({"error": e.code})
async def disconnect(self, code):
"""
Called when the WebSocket closes for any reason.
"""
# Leave all the rooms we are still in
for room_id in list(self.rooms):
try:
await self.leave_room(room_id)
except ClientError:
pass
##### Command helper methods called by receive_json
async def join_room(self, room_id):
"""
Called by receive_json when someone sent a join command.
"""
# The logged-in user is in our scope thanks to the authentication ASGI middleware
room = await get_room_or_error(room_id, self.scope["user"])
# Send a join message if it's turned on
if settings.NOTIFY_USERS_ON_ENTER_OR_LEAVE_ROOMS:
await self.channel_layer.group_send(
room.group_name,
{
"type": "chat.join",
"room_id": room_id,
"username": self.scope["user"].username,
}
)
# Store that we're in the room
self.rooms.add(room_id)
# Add them to the group so they get room messages
# Join room group
await self.channel_layer.group_add(
room.group_name,
self.channel_name,
self.room_group_name,
self.channel_name
)
# Instruct their client to finish opening the room
await self.send_json({
"join": str(room.id),
"title": room.title,
})
async def leave_room(self, room_id):
"""
Called by receive_json when someone sent a leave command.
"""
# The logged-in user is in our scope thanks to the authentication ASGI middleware
room = await get_room_or_error(room_id, self.scope["user"])
# Send a leave message if it's turned on
if settings.NOTIFY_USERS_ON_ENTER_OR_LEAVE_ROOMS:
await self.channel_layer.group_send(
room.group_name,
{
"type": "chat.leave",
"room_id": room_id,
"username": self.scope["user"].username,
}
)
# Remove that we're in the room
self.rooms.discard(room_id)
# Remove them from the group so they no longer get room messages
await self.accept()
async def disconnect(self, close_code):
# Leave room group
await self.channel_layer.group_discard(
room.group_name,
self.channel_name,
self.room_group_name,
self.channel_name
)
# Instruct their client to finish closing the room
await self.send_json({
"leave": str(room.id),
})
async def send_room(self, room_id, message):
"""
Called by receive_json when someone sends a message to a room.
"""
# Check they are in this room
if room_id not in self.rooms:
raise ClientError("ROOM_ACCESS_DENIED")
# Get the room and send to the group about it
room = await get_room_or_error(room_id, self.scope["user"])
# Receive message from WebSocket
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
# Send message to room group
await self.channel_layer.group_send(
room.group_name,
self.room_group_name,
{
"type": "chat.message",
"room_id": room_id,
"username": self.scope["user"].username,
"message": message,
'type': 'chat_message',
'message': message
}
)
##### Handlers for messages sent over the channel layer
# These helper methods are named by the types we send - so chat.join becomes chat_join
async def chat_join(self, event):
"""
Called when someone has joined our chat.
"""
# Send a message down to the client
await self.send_json(
{
"msg_type": settings.MSG_TYPE_ENTER,
"room": event["room_id"],
"username": event["username"],
},
)
async def chat_leave(self, event):
"""
Called when someone has left our chat.
"""
# Send a message down to the client
await self.send_json(
{
"msg_type": settings.MSG_TYPE_LEAVE,
"room": event["room_id"],
"username": event["username"],
},
)
# Receive message from room group
async def chat_message(self, event):
"""
Called when someone has messaged our chat.
"""
# Send a message down to the client
await self.send_json(
{
"msg_type": settings.MSG_TYPE_MESSAGE,
"room": event["room_id"],
"username": event["username"],
"message": event["message"],
},
)
message = event['message']
# Send message to WebSocket
await self.send(text_data=json.dumps({
'message': message
}))

17
polls/job.py Normal file
View File

@ -0,0 +1,17 @@
from background_task import background
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from polls.models import Message
@background(schedule=0)
def process_task(task, user_id):
channel_layer = get_channel_layer()
groups = task.groups
for g in groups:
async_to_sync(channel_layer.group_send)(g.id, {
"type": 0,
"send_from": user_id,
"group_id": g.id,
"content": g.content
})

View File

@ -70,7 +70,7 @@ class Task(models.Model):
ordering = ["-added"]
MESSAGE_TYPE_CHOICES = (
TASK_ADDITION_TYPE_CHOICES = (
(0, 'url'),
(1, 'file'),
(2, 'picture')
@ -81,7 +81,7 @@ class TaskAddition(models.Model):
id = models.UUIDField('id', primary_key=True, default=uuid.uuid4)
task = models.ForeignKey(Task, on_delete=models.CASCADE)
category = models.IntegerField(
'type', choices=MESSAGE_TYPE_CHOICES, default=0)
'category', choices=TASK_ADDITION_TYPE_CHOICES, default=1)
url = models.CharField('url', max_length=256, null=True, blank=True)
file = models.FileField(
upload_to='task/file/%Y/%m/%d/', null=True, blank=True)
@ -108,42 +108,35 @@ class Message(models.Model):
id = models.UUIDField('id', primary_key=True, default=uuid.uuid4)
send_from = models.ForeignKey(User, on_delete=models.CASCADE)
send_to = models.ForeignKey(Group, on_delete=models.CASCADE)
content = models.TextField('内容', null=True, blank=True)
task = models.ForeignKey(Task, on_delete=models.CASCADE)
added = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
abstract = True
ordering = ["-added"]
def __str__(self):
return self.send_from + ':' + self.send_toÒ
class MessageAddition(models.Model):
id = models.UUIDField('id', primary_key=True, default=uuid.uuid4)
category = models.IntegerField(
'category', choices=MESSAGE_TYPE_CHOICES, default=0)
message = models.ForeignKey(Message, on_delete=models.CASCADE)
class Meta:
abstract = True
class NormalMessage(Message):
content = models.TextField('内容', null=True, blank=True)
class URLMessage(MessageAddition):
class URLMessage(Message):
title = models.CharField('title', max_length=256, null=False)
description = models.CharField('description', max_length=512, null=False)
image = models.CharField('image', max_length=256, null=True, blank=True)
url = models.CharField('url', max_length=256, null=True, blank=True)
class FileMessage(MessageAddition):
class FileMessage(Message):
title = models.CharField('title', max_length=256, null=False)
file = models.CharField('file', max_length=256, null=False)
class ImageMessage(MessageAddition):
class ImageMessage(Message):
title = models.CharField('title', max_length=256, null=False)
picture = models.CharField('picture', max_length=256, null=False)

View File

@ -18,5 +18,7 @@ urlpatterns = [
path('medias/list/', views.medias, name='polls_medias'),
path('news/list/', views.news_list, name='polls_news'),
path('news/detail/', views.news_detail, name='polls_news_detail'),
path('monitor/statistics/', views.monitor_statistics, name='polls_monitor_statistics')
path('monitor/statistics/', views.monitor_statistics, name='polls_monitor_statistics'),
path('tasks/list/', views.tasks, name='polls_tasks_list'),
path('tasks/create/', views.create_task, name='polls_tasks_create'),
]

View File

@ -6,6 +6,7 @@ from parsel import Selector
from channels.db import database_sync_to_async
from .exceptions import ClientError
from django.conf import settings
from polls.models import FileMessage, ImageMessage, NormalMessage, URLMessage
def sent_sms_code(phone, code):
@ -94,11 +95,23 @@ def get_room_or_error(room_id, user):
raise ClientError("USER_HAS_TO_LOGIN")
def model_to_dict(o, fields):
result = dict()
for f in fields:
result[f] = o.f
return result
@database_sync_to_async
def build_message(type, user_id, group_id, task_id, payload):
if type == 1:
URLMessage.objects.create(send_from__id=user_id, send_to__id=group_id,
task__id=task_id, title=payload.title, description=payload.description, url=payload.url, image=payload.image)
elif type == 1:
FileMessage.objects.create(send_from__id=user_id, send_to__id=group_id,
task__id=task_id, title=payload.title, file=payload.file)
elif type == 2:
ImageMessage.objects.create(send_from__id=user_id, send_to__id=group_id,
task__id=task_id, title=payload.title, image=payload.image)
else:
NormalMessage.objects.create(send_from__id=user_id, send_to__id=group_id,
category=0, task__id=task_id, content=payload.content)
if __name__ == '__main__':

View File

@ -3,3 +3,4 @@ from .notice import notices, read_notice
from .media import medias, create_media
from .news import news_list, news_detail
from .monitor import monitor_statistics
from .task import tasks, create_task

Binary file not shown.

View File

@ -6,6 +6,7 @@ import datetime
from polls.decorators import polls_login_required
from polls.models import Task, TaskAddition
from polls.job import process_task
from django.core.exceptions import ObjectDoesNotExist
@ -35,20 +36,24 @@ def tasks(request):
def create_task(request):
if request.method == 'GET':
return HttpResponse(status=405)
user_id = request.user.id
user = request.user
content = request.POST.get('content')
if not content:
return JsonResponse({'status': 'error', 'message': '内容不能为空'})
groups = request.POST.getlist('groups',[])
task = Task.objects.create(created_by__id=user_id, content=content)
groups = request.POST.getlist('groups', [])
task = Task.objects.create(created_by=user, content=content)
task.add_groups(groups)
url = request.POST.get('url')
file = request.FILES.get('file')
picture = request.FILES.get('picture')
if not url:
urlAddtion = TaskAddition.objects.create(task=task,category=0, url=url)
urlAddtion = TaskAddition.objects.create(
task=task, category=0, url=url)
if not file:
fileAddtion = TaskAddition.objects.create(task=task,category=1, file=file)
fileAddtion = TaskAddition.objects.create(
task=task, category=1, file=file)
if not picture:
pictureAddtion = TaskAddition.objects.create(task=task,category=2, image=picture)
return JsonResponse({'status': 'success'})
pictureAddtion = TaskAddition.objects.create(
task=task, category=2, image=picture)
process_task(task, user)
return JsonResponse({'status': 'success'})

View File

@ -8,4 +8,5 @@ channels
requests
parsel
django-summernote
python-dateutil
python-dateutil
channels_redis[cryptography]