feat: add personnel and department manage pages

This commit is contained in:
2025-09-20 21:01:02 +08:00
parent ef9a5b5618
commit 4a0b845dff
29 changed files with 2617 additions and 18 deletions
+149 -1
View File
@@ -38,6 +38,12 @@ class EmailNotificationMiddleware(MiddlewareMixin):
self._handle_item_operation_async(request, response, user_info)
elif '/api/records/' in request.path or '/api/finance/' in request.path:
self._handle_finance_operation_async(request, response, user_info)
elif '/api/personnel/' in request.path:
self._handle_personnel_operation_async(request, response, user_info)
elif '/api/departments/' in request.path:
self._handle_department_operation_async(request, response, user_info)
elif '/api/project-groups/' in request.path:
self._handle_project_group_operation_async(request, response, user_info)
except Exception as e:
logger.error(f"邮件通知中间件处理失败: {e}")
@@ -137,7 +143,7 @@ class EmailNotificationMiddleware(MiddlewareMixin):
'department': finance_data.get('department', ''),
'category': finance_data.get('category', ''),
'approver': finance_data.get('approver', ''),
'timestamp': finance_data.get('transaction_date', ''),
'timestamp': finance_data.get('updated_at', ''),
'operation_path': request.path,
'operation_method': request.method
}
@@ -154,6 +160,148 @@ class EmailNotificationMiddleware(MiddlewareMixin):
thread.daemon = True # 设置为守护线程
thread.start()
def _handle_personnel_operation_async(self, request, response, user_info):
"""异步处理人员操作"""
def send_notification():
try:
operation_type = self._get_operation_type(request.method, request.path)
# 尝试从响应中获取数据
if hasattr(response, 'data'):
personnel_data = response.data
else:
try:
content = response.content.decode('utf-8')
personnel_data = json.loads(content) if content else {}
except:
personnel_data = {}
# 确保personnel_data不为None
if personnel_data is None:
personnel_data = {}
# 构建通知数据,使用正确的字段名
message = personnel_data['message', '']
personnel_data = personnel_data['data', '']
notification_data = {
'message': message,
'id': personnel_data.get('id', ''),
'name': personnel_data.get('name', ''),
'student_id': personnel_data.get('student_id', ''),
'gender': personnel_data.get('gender', ''),
'grader_major': personnel_data.get('grade_major', ''),
'department': personnel_data.get('department', ''),
'project_group': personnel_data.get('project_group', ''),
'position': personnel_data.get('position', ''),
'start_date': personnel_data.get('start_date', ''),
'end_date': personnel_data.get('end_date', ''),
'is_active': personnel_data.get('is_active', ''),
'phone': personnel_data.get('phone', ''),
'qq': personnel_data.get('qq', ''),
'email': personnel_data.get('email', ''),
'description': personnel_data.get('description', ''),
'timestamp': personnel_data.get('updated_at', ''),
'operation_path': request.path,
'operation_method': request.method
}
EmailNotificationService.send_operation_notification(
operation_type, '人员', notification_data, user_info
)
except Exception as e:
logger.error(f"异步处理人员操作通知失败: {e}")
# 创建并启动后台线程
thread = threading.Thread(target=send_notification)
thread.daemon = True # 设置为守护线程
thread.start()
def _handle_department_operation_async(self, request, response, user_info):
"""异步处理部门操作"""
def send_notification():
try:
operation_type = self._get_operation_type(request.method, request.path)
# 尝试从响应中获取数据
if hasattr(response, 'data'):
department_data = response.data
else:
try:
content = response.content.decode('utf-8')
department_data = json.loads(content) if content else {}
except:
department_data = {}
# 确保department_data不为None
if department_data is None:
department_data = {}
# 构建通知数据
notification_data = {
'id': department_data.get('id', ''),
'name': department_data.get('name', ''),
'timestamp': department_data.get('updated_at', ''),
'operation_path': request.path,
'operation_method': request.method
}
EmailNotificationService.send_operation_notification(
operation_type, '部门', notification_data, user_info
)
except Exception as e:
logger.error(f"异步处理部门操作通知失败: {e}")
# 创建并启动后台线程
thread = threading.Thread(target=send_notification)
thread.daemon = True # 设置为守护线程
thread.start()
def _handle_project_group_operation_async(self, request, response, user_info):
"""异步处理项目组操作"""
def send_notification():
try:
operation_type = self._get_operation_type(request.method, request.path)
# 尝试从响应中获取数据
if hasattr(response, 'data'):
project_group_data = response.data
else:
try:
content = response.content.decode('utf-8')
project_group_data = json.loads(content) if content else {}
except:
project_group_data = {}
# 确保project_group_data不为None
if project_group_data is None:
project_group_data = {}
# 构建通知数据
notification_data = {
'id': project_group_data.get('id', ''),
'name': project_group_data.get('name', ''),
'department': project_group_data.get('department', ''),
'department_name': project_group_data.get('department_name', ''),
'description': project_group_data.get('description', ''),
'timestamp': project_group_data.get('updated_at', ''),
'operation_path': request.path,
'operation_method': request.method
}
EmailNotificationService.send_operation_notification(
operation_type, '项目组', notification_data, user_info
)
except Exception as e:
logger.error(f"异步处理项目组操作通知失败: {e}")
# 创建并启动后台线程
thread = threading.Thread(target=send_notification)
thread.daemon = True # 设置为守护线程
thread.start()
def _get_operation_type(self, method, path):
"""根据HTTP方法和路径判断操作类型"""
if method == 'POST':
+61 -6
View File
@@ -15,7 +15,7 @@ from .serializers import (
class FinancialRecordViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows financial records to be viewed or edited.
获取财务记录
"""
queryset = FinancialRecord.objects.all()
@@ -56,6 +56,7 @@ class FinancialRecordViewSet(viewsets.ModelViewSet):
'description': instance.description,
'record_type': instance.record_type,
'transaction_date': str(instance.transaction_date),
'timestamp': timezone.now().isoformat(),
'department': instance.department.name if instance.department else '',
'category': instance.category.name if instance.category else '',
'fund_manager': instance.fund_manager if hasattr(instance, 'fund_manager') else ''
@@ -76,7 +77,6 @@ class FinancialRecordViewSet(viewsets.ModelViewSet):
# 立即删除财务记录(会级联删除相关的凭证图片记录)
super().destroy(request, *args, **kwargs)
# 异步发送删除通知邮件
def send_delete_notification():
"""异步发送删除通知邮件"""
try:
@@ -96,7 +96,7 @@ class FinancialRecordViewSet(viewsets.ModelViewSet):
'category': record_info['category'],
'fund_manager': record_info['fund_manager'],
'transaction_date': record_info['transaction_date'],
'deleted_at': timezone.now().isoformat(),
'timestamp': timezone.now().isoformat(), # 删除条目的时间 :(
'operation_path': request.path,
'operation_method': request.method
}
@@ -147,7 +147,7 @@ class FinancialRecordViewSet(viewsets.ModelViewSet):
class ProofImageViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows proof images to be viewed or edited.
凭证API
"""
queryset = ProofImage.objects.all()
serializer_class = ProofImageSerializer
@@ -182,15 +182,70 @@ class ProofImageViewSet(viewsets.ModelViewSet):
class DepartmentViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows departments to be viewed or edited.
获取部门
"""
queryset = Department.objects.all()
serializer_class = DepartmentSerializer
def destroy(self, request, *args, **kwargs):
"""删除部门,发送邮件通知"""
from email_notice.services import EmailNotificationService
import threading
import logging
logger = logging.getLogger(__name__)
# 获取要删除的部门信息
department = self.get_object()
department_data = {
'id': department.id,
'name': department.name,
}
# 获取用户信息
user_info = self._get_user_info(request)
# 执行删除操作
response = super().destroy(request, *args, **kwargs)
# 异步发送删除通知邮件
def send_delete_notification():
try:
EmailNotificationService.send_operation_notification(
'DELETE', '部门', department_data, user_info
)
except Exception as e:
logger.error(f"发送部门删除通知邮件失败: {e}")
thread = threading.Thread(target=send_delete_notification)
thread.daemon = True
thread.start()
return response
def _get_user_info(self, request):
"""获取用户信息"""
try:
if hasattr(request, 'user') and request.user.is_authenticated:
return f"{request.user.username} ({request.user.email})"
else:
return f"匿名用户 (IP: {self._get_client_ip(request)})"
except:
return "未知用户"
def _get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class CategoryViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows categories to be viewed or edited.
获取分类
"""
queryset = Category.objects.all()
serializer_class = CategorySerializer
+39
View File
@@ -44,9 +44,13 @@ INSTALLED_APPS = [
"django.contrib.staticfiles",
"rest_framework",
"corsheaders",
"django_filters",
"django_apscheduler",
"items",
"finance",
"email_notice",
"personnel",
"scheduler",
]
MIDDLEWARE = [
@@ -170,4 +174,39 @@ ADMINS = SECURE["SMTP"]["ADMINS"]
# Default primary key field type
# https://docs.djangoproject.com/en/5.2/ref/settings/#default-auto-field
# 日志配置
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': BASE_DIR / 'logs' / 'scheduler.log',
'formatter': 'verbose',
},
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'simple',
},
},
'loggers': {
'scheduler': {
'handlers': ['file', 'console'],
'level': 'INFO',
'propagate': True,
},
},
}
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
+1
View File
@@ -25,6 +25,7 @@ urlpatterns = [
path("", include("items.urls")),
path("", include("finance.urls")),
path("", include("email_notice.urls")),
path("", include("personnel.urls")),
path("api-auth/", include("rest_framework.urls")),
]
View File
+117
View File
@@ -0,0 +1,117 @@
from django.contrib import admin
from .models import Personnel, ProjectGroup
@admin.register(ProjectGroup)
class ProjectGroupAdmin(admin.ModelAdmin):
"""项目组管理"""
list_display = ['name', 'department', 'description', 'created_at']
list_filter = ['department', 'created_at']
search_fields = ['name', 'description']
ordering = ['department', 'name']
import django_filters
@admin.register(Personnel)
class PersonnelAdmin(admin.ModelAdmin):
"""人员信息管理"""
list_display = [
'name', 'student_id', 'department', 'project_group', 'position',
'is_active', 'start_date', 'end_date'
]
list_filter = [
'department', 'project_group', 'position', 'gender',
'is_active', 'start_date'
]
search_fields = ['name', 'student_id', 'phone', 'email', 'grade_major']
ordering = ['-is_active', 'department', 'position', 'name']
fieldsets = (
('基本信息', {
'fields': ('name', 'student_id', 'gender', 'grade_major')
}),
('职位信息', {
'fields': ('department', 'project_group', 'position', 'start_date', 'end_date', 'is_active')
}),
('联系方式', {
'fields': ('phone', 'qq', 'email')
}),
('其他信息', {
'fields': ('description',),
'classes': ('collapse',)
})
)
readonly_fields = ('created_at', 'updated_at')
def save_model(self, request, obj, form, change):
"""保存模型时的额外处理"""
super().save_model(request, obj, form, change)
def get_queryset(self, request):
"""优化查询"""
return super().get_queryset(request).select_related('department', 'project_group')
from .models import Personnel, ProjectGroup
from finance.models import Department
class PersonnelFilter(django_filters.FilterSet):
"""人员信息过滤器"""
# 基本筛选
department = django_filters.ModelChoiceFilter(
queryset=Department.objects.all(),
field_name='department',
label='部门'
)
project_group = django_filters.ModelChoiceFilter(
queryset=ProjectGroup.objects.all(),
field_name='project_group',
label='项目组'
)
position = django_filters.CharFilter(
field_name='position',
lookup_expr='icontains',
label='职位'
)
gender = django_filters.ChoiceFilter(
choices=Personnel.GENDER_CHOICES,
field_name='gender',
label='性别'
)
is_active = django_filters.BooleanFilter(
field_name='is_active',
label='在职状态'
)
# 日期范围筛选
start_date_from = django_filters.DateFilter(
field_name='start_date',
lookup_expr='gte',
label='任职开始时间(从)'
)
start_date_to = django_filters.DateFilter(
field_name='start_date',
lookup_expr='lte',
label='任职开始时间(到)'
)
end_date_from = django_filters.DateFilter(
field_name='end_date',
lookup_expr='gte',
label='任职结束时间(从)'
)
end_date_to = django_filters.DateFilter(
field_name='end_date',
lookup_expr='lte',
label='任职结束时间(到)'
)
class Meta:
model = Personnel
fields = ['department', 'project_group', 'position', 'gender', 'is_active']
+7
View File
@@ -0,0 +1,7 @@
from django.apps import AppConfig
class PersonnelConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'personnel'
verbose_name = '人员管理'
+67
View File
@@ -0,0 +1,67 @@
import django_filters
from django.db import models
from .models import Personnel, ProjectGroup
from finance.models import Department
class PersonnelFilter(django_filters.FilterSet):
"""人员信息过滤器"""
# 基本筛选
department = django_filters.ModelChoiceFilter(
queryset=Department.objects.all(),
field_name='department',
label='部门'
)
project_group = django_filters.ModelChoiceFilter(
queryset=ProjectGroup.objects.all(),
field_name='project_group',
label='项目组'
)
position = django_filters.CharFilter(
field_name='position',
lookup_expr='icontains',
label='职位'
)
gender = django_filters.ChoiceFilter(
choices=Personnel.GENDER_CHOICES,
field_name='gender',
label='性别'
)
is_active = django_filters.BooleanFilter(
field_name='is_active',
label='在职状态'
)
# 日期范围筛选
start_date_from = django_filters.DateFilter(
field_name='start_date',
lookup_expr='gte',
label='任职开始时间(从)'
)
start_date_to = django_filters.DateFilter(
field_name='start_date',
lookup_expr='lte',
label='任职开始时间(到)'
)
end_date_from = django_filters.DateFilter(
field_name='end_date',
lookup_expr='gte',
label='任职结束时间(从)'
)
end_date_to = django_filters.DateFilter(
field_name='end_date',
lookup_expr='lte',
label='任职结束时间(到)'
)
class Meta:
model = Personnel
fields = ['department', 'project_group', 'position', 'gender', 'is_active']
@@ -0,0 +1,57 @@
from django.core.management.base import BaseCommand
from django.utils import timezone
from personnel.models import Personnel
class Command(BaseCommand):
help = '检查人员任职结束时间,自动设置已到期人员为已卸任状态'
def add_arguments(self, parser):
parser.add_argument(
'--dry-run',
action='store_true',
help='仅显示将要更新的人员,不实际执行更新',
)
def handle(self, *args, **options):
today = timezone.now().date()
dry_run = options['dry_run']
# 查找任职结束时间已到期但仍在职的人员
expired_personnel = Personnel.objects.filter(
end_date__lte=today,
is_active=True
)
if not expired_personnel.exists():
self.stdout.write(
self.style.SUCCESS('没有需要设置为已卸任状态的人员')
)
return
self.stdout.write(
self.style.WARNING(f'发现 {expired_personnel.count()} 名任职已到期的人员:')
)
updated_count = 0
for person in expired_personnel:
self.stdout.write(
f' - {person.name} (学号: {person.student_id}) '
f'任职结束时间: {person.end_date} '
f'部门: {person.department.name} '
f'职位: {person.position}'
)
if not dry_run:
person.is_active = False
person.save()
updated_count += 1
if dry_run:
self.stdout.write(
self.style.WARNING('这是预览模式,未实际更新任何数据')
)
else:
self.stdout.write(
self.style.SUCCESS(f'成功将 {updated_count} 名人员设置为已卸任状态')
)
@@ -0,0 +1,88 @@
from django.core.management.base import BaseCommand
from finance.models import Department
from personnel.models import ProjectGroup
class Command(BaseCommand):
help = '创建示例的项目组数据'
def handle(self, *args, **options):
# 确保部门存在,如果不存在则创建
departments_data = [
'爱特工作室本部',
'程序部',
'Web部',
'游戏部',
'IOS部',
'APP部',
'UI部',
'智能应用部',
'OpenHarmony部',
'FOSS部'
]
self.stdout.write('正在创建部门...')
for dept_name in departments_data:
dept, created = Department.objects.get_or_create(name=dept_name)
if created:
self.stdout.write(f' 创建部门: {dept_name}')
else:
self.stdout.write(f' 部门已存在: {dept_name}')
# 创建项目组数据
project_groups_data = [
# 程序部项目组
{'name': '后端开发组', 'department_name': '程序部', 'description': '负责后端系统开发与维护'},
{'name': '算法研究组', 'department_name': '程序部', 'description': '专注算法研究与优化'},
# Web部项目组
{'name': '前端开发组', 'department_name': 'Web部', 'description': '负责前端页面开发'},
{'name': 'UI设计组', 'department_name': 'Web部', 'description': '负责用户界面设计'},
# 游戏部项目组
{'name': 'Unity开发组', 'department_name': '游戏部', 'description': '使用Unity引擎开发游戏'},
{'name': '游戏策划组', 'department_name': '游戏部', 'description': '负责游戏策划与设计'},
# IOS部项目组
{'name': 'iOS原生开发组', 'department_name': 'IOS部', 'description': '开发iOS原生应用'},
{'name': 'Swift开发组', 'department_name': 'IOS部', 'description': '使用Swift语言开发'},
# APP部项目组
{'name': 'Android开发组', 'department_name': 'APP部', 'description': '开发Android应用'},
{'name': '跨平台开发组', 'department_name': 'APP部', 'description': '使用Flutter、React Native等开发'},
# UI部项目组
{'name': '视觉设计组', 'department_name': 'UI部', 'description': '负责视觉设计与品牌设计'},
{'name': '交互设计组', 'department_name': 'UI部', 'description': '负责用户体验与交互设计'},
# 智能应用部项目组
{'name': 'AI研发组', 'department_name': '智能应用部', 'description': '人工智能技术研发'},
{'name': '机器学习组', 'department_name': '智能应用部', 'description': '机器学习算法应用'},
# OpenHarmony部项目组
{'name': 'HarmonyOS开发组', 'department_name': 'OpenHarmony部', 'description': '开发HarmonyOS应用'},
{'name': '鸿蒙系统组', 'department_name': 'OpenHarmony部', 'description': '鸿蒙生态系统开发'},
# FOSS部项目组
{'name': '开源项目组', 'department_name': 'FOSS部', 'description': '维护和开发开源项目'},
{'name': 'Linux系统组', 'department_name': 'FOSS部', 'description': 'Linux系统维护与开发'},
]
self.stdout.write('正在创建项目组...')
for group_data in project_groups_data:
try:
department = Department.objects.get(name=group_data['department_name'])
group, created = ProjectGroup.objects.get_or_create(
name=group_data['name'],
department=department,
defaults={'description': group_data['description']}
)
if created:
self.stdout.write(f' 创建项目组: {group_data["name"]} ({group_data["department_name"]})')
else:
self.stdout.write(f' 项目组已存在: {group_data["name"]}')
except Department.DoesNotExist:
self.stdout.write(self.style.ERROR(f' 部门不存在: {group_data["department_name"]}'))
self.stdout.write(self.style.SUCCESS('示例数据创建完成!'))
self.stdout.write('现在您可以在人员管理界面中看到这些项目组选项了。')
+124
View File
@@ -0,0 +1,124 @@
from django.db import models
from django.core.validators import RegexValidator
from finance.models import Department
class ProjectGroup(models.Model):
"""项目组模型"""
name = models.CharField(max_length=100, unique=True, verbose_name="项目组名称")
department = models.ForeignKey(Department, on_delete=models.CASCADE, verbose_name="所属部门")
description = models.TextField(blank=True, null=True, verbose_name="项目组描述")
created_at = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
class Meta:
verbose_name = "项目组"
verbose_name_plural = verbose_name
ordering = ['department', 'name']
def __str__(self):
return f"{self.department.name} - {self.name}"
class Personnel(models.Model):
"""人员信息模型"""
GENDER_CHOICES = [
('male', ''),
('female', ''),
]
# 基本信息
name = models.CharField(max_length=50, verbose_name="姓名")
student_id = models.CharField(
max_length=20,
unique=True,
verbose_name="学号",
validators=[RegexValidator(
regex=r'^\d{8,12}$',
message='学号应为8-12位数字'
)]
)
gender = models.CharField(max_length=10, choices=GENDER_CHOICES, verbose_name="性别")
grade_major = models.CharField(max_length=100, verbose_name="年级专业")
# 职位信息
department = models.ForeignKey(Department, on_delete=models.CASCADE, verbose_name="所属部门")
project_group = models.ForeignKey(
ProjectGroup,
on_delete=models.SET_NULL,
null=True,
blank=True,
verbose_name="项目组"
)
position = models.CharField(max_length=50, verbose_name="职位")
start_date = models.DateField(verbose_name="任职开始时间")
end_date = models.DateField(null=True, blank=True, verbose_name="任职结束时间")
is_active = models.BooleanField(default=True, verbose_name="是否在职")
# 联系方式
phone = models.CharField(
max_length=11,
verbose_name="手机号",
validators=[RegexValidator(
regex=r'^1[3-9]\d{9}$',
message='请输入有效的手机号码'
)]
)
qq = models.CharField(
max_length=15,
verbose_name="QQ号",
validators=[RegexValidator(
regex=r'^\d{5,15}$',
message='QQ号应为5-15位数字'
)]
)
email = models.EmailField(verbose_name="邮箱")
# 详细信息
description = models.TextField(blank=True, null=True, verbose_name="备注信息")
# 时间戳
created_at = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
updated_at = models.DateTimeField(auto_now=True, verbose_name="更新时间")
class Meta:
verbose_name = "人员信息"
verbose_name_plural = verbose_name
ordering = ['-is_active', 'department', 'position', 'name']
indexes = [
models.Index(fields=['department', 'is_active']),
models.Index(fields=['student_id']),
]
def __str__(self):
return f"{self.name} ({self.department.name} - {self.position})"
@property
def status_display(self):
"""返回在职状态显示"""
if self.is_active:
return "在职"
return "已卸任"
def save(self, *args, **kwargs):
"""保存时自动更新在职状态"""
from django.utils import timezone
# 如果设置了结束时间且已到期,自动设置为已卸任
if self.end_date and self.end_date <= timezone.now().date():
self.is_active = False
super().save(*args, **kwargs)
@classmethod
def check_and_update_expired_personnel(cls):
"""检查并更新所有已到期的人员状态"""
from django.utils import timezone
today = timezone.now().date()
expired_personnel = cls.objects.filter(
end_date__lte=today,
is_active=True
)
updated_count = expired_personnel.update(is_active=False)
return updated_count, list(expired_personnel.values_list('name', flat=True))
+73
View File
@@ -0,0 +1,73 @@
from rest_framework import serializers
from .models import Personnel, ProjectGroup
from finance.models import Department
class ProjectGroupSerializer(serializers.ModelSerializer):
"""项目组序列化器"""
department_name = serializers.CharField(source='department.name', read_only=True)
class Meta:
model = ProjectGroup
fields = ['id', 'name', 'department', 'department_name', 'description', 'created_at']
read_only_fields = ['created_at']
class PersonnelReadSerializer(serializers.ModelSerializer):
"""人员信息读取序列化器"""
department_name = serializers.CharField(source='department.name', read_only=True)
project_group_name = serializers.CharField(source='project_group.name', read_only=True)
position_display = serializers.CharField(source='position', read_only=True)
gender_display = serializers.CharField(source='get_gender_display', read_only=True)
status_display = serializers.CharField(read_only=True)
class Meta:
model = Personnel
fields = [
'id', 'name', 'student_id', 'gender', 'gender_display', 'grade_major',
'department', 'department_name', 'project_group', 'project_group_name',
'position', 'position_display', 'start_date', 'end_date', 'is_active',
'status_display', 'phone', 'qq', 'email', 'description',
'created_at', 'updated_at'
]
class PersonnelWriteSerializer(serializers.ModelSerializer):
"""人员信息写入序列化器"""
class Meta:
model = Personnel
fields = [
'id', 'name', 'student_id', 'gender', 'grade_major',
'department', 'project_group', 'position', 'start_date', 'end_date',
'is_active', 'phone', 'qq', 'email', 'description'
]
def validate_student_id(self, value):
"""验证学号唯一性"""
if self.instance and self.instance.student_id == value:
return value
if Personnel.objects.filter(student_id=value).exists():
raise serializers.ValidationError("该学号已存在")
return value
def validate_project_group(self, value):
"""验证项目组是否属于选定的部门"""
if value and hasattr(self, 'initial_data'):
department_id = self.initial_data.get('department')
if department_id and value.department_id != int(department_id):
raise serializers.ValidationError("项目组必须属于选定的部门")
return value
def validate(self, attrs):
"""整体验证"""
start_date = attrs.get('start_date')
end_date = attrs.get('end_date')
if start_date and end_date and start_date >= end_date:
raise serializers.ValidationError({
'end_date': '任职结束时间必须晚于开始时间'
})
return attrs
+11
View File
@@ -0,0 +1,11 @@
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import PersonnelViewSet, ProjectGroupViewSet
router = DefaultRouter()
router.register(r'personnel', PersonnelViewSet)
router.register(r'project-groups', ProjectGroupViewSet)
urlpatterns = [
path('api/', include(router.urls)),
]
+269
View File
@@ -0,0 +1,269 @@
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.filters import SearchFilter, OrderingFilter
from django.utils import timezone
import logging
from .models import Personnel, ProjectGroup
from .serializers import (
PersonnelReadSerializer,
PersonnelWriteSerializer,
ProjectGroupSerializer
)
from .filters import PersonnelFilter
logger = logging.getLogger(__name__)
class PersonnelViewSet(viewsets.ModelViewSet):
"""人员信息视图集"""
queryset = Personnel.objects.all()
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_class = PersonnelFilter
search_fields = ['name', 'student_id', 'phone', 'email', 'grade_major']
ordering_fields = ['created_at', 'start_date', 'end_date', 'name']
ordering = ['-is_active', 'department', 'position', 'name']
def get_serializer_class(self):
"""根据操作类型返回不同的序列化器"""
if self.action in ['list', 'retrieve']:
return PersonnelReadSerializer
return PersonnelWriteSerializer
@action(detail=False, methods=['get'])
def statistics(self, request):
"""获取人员统计信息"""
total_count = Personnel.objects.count()
active_count = Personnel.objects.filter(is_active=True).count()
inactive_count = total_count - active_count
# 按部门统计
department_stats = {}
for person in Personnel.objects.select_related('department'):
dept_name = person.department.name
if dept_name not in department_stats:
department_stats[dept_name] = {'total': 0, 'active': 0, 'inactive': 0}
department_stats[dept_name]['total'] += 1
if person.is_active:
department_stats[dept_name]['active'] += 1
else:
department_stats[dept_name]['inactive'] += 1
# 按职位统计
position_stats = {}
for person in Personnel.objects.all():
position = person.position
if position not in position_stats:
position_stats[position] = {'total': 0, 'active': 0, 'inactive': 0}
position_stats[position]['total'] += 1
if person.is_active:
position_stats[position]['active'] += 1
else:
position_stats[position]['inactive'] += 1
return Response({
'overview': {
'total': total_count,
'active': active_count,
'inactive': inactive_count
},
'by_department': department_stats,
'by_position': position_stats
})
@action(detail=True, methods=['post'])
def set_inactive(self, request, pk=None):
"""设置人员为已卸任状态"""
personnel = self.get_object()
personnel.is_active = False
personnel.end_date = timezone.now().date()
personnel.save()
serializer = self.get_serializer(personnel)
return Response({
'message': f'{personnel.name} 已设置为已卸任状态',
'data': serializer.data
})
@action(detail=True, methods=['post'])
def set_active(self, request, pk=None):
"""设置人员为在职状态"""
personnel = self.get_object()
personnel.is_active = True
personnel.end_date = None
personnel.save()
serializer = self.get_serializer(personnel)
return Response({
'message': f'{personnel.name} 已设置为在职状态',
'data': serializer.data
})
def destroy(self, request, *args, **kwargs):
"""删除人员,发送邮件通知"""
from email_notice.services import EmailNotificationService
import threading
# 获取要删除的人员信息
personnel = self.get_object()
personnel_data = {
'id': personnel.id,
'name': personnel.name,
'student_id': personnel.student_id,
'email': personnel.email,
'phone': personnel.phone,
'department_name': personnel.department.name if personnel.department else '',
'project_group_name': personnel.project_group.name if personnel.project_group else '',
'position': personnel.position,
'is_active': personnel.is_active,
'start_date': str(personnel.start_date),
'end_date': str(personnel.end_date) if personnel.end_date else '',
'timestamp': timezone.now().isoformat(),
}
# 获取用户信息
user_info = self._get_user_info(request)
# 执行删除操作
response = super().destroy(request, *args, **kwargs)
# 异步发送删除通知邮件
def send_delete_notification():
try:
EmailNotificationService.send_operation_notification(
'DELETE', '人员', personnel_data, user_info
)
except Exception as e:
logger.error(f"发送人员删除通知邮件失败: {e}")
thread = threading.Thread(target=send_delete_notification)
thread.daemon = True
thread.start()
return response
def _get_user_info(self, request):
"""获取用户信息"""
try:
if hasattr(request, 'user') and request.user.is_authenticated:
return f"{request.user.username} ({request.user.email})"
else:
return f"匿名用户 (IP: {self._get_client_ip(request)})"
except:
return "未知用户"
def _get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
@action(detail=False, methods=['post'])
def check_expired(self, request):
"""检查并更新已到期的人员状态"""
updated_count, updated_names = Personnel.check_and_update_expired_personnel()
if updated_count > 0:
return Response({
'message': f'成功将 {updated_count} 名人员设置为已卸任状态',
'updated_personnel': updated_names,
'count': updated_count
})
else:
return Response({
'message': '没有需要更新的人员',
'updated_personnel': [],
'count': 0
})
class ProjectGroupViewSet(viewsets.ModelViewSet):
"""项目组视图集"""
queryset = ProjectGroup.objects.all()
serializer_class = ProjectGroupSerializer
filter_backends = [DjangoFilterBackend, SearchFilter]
filterset_fields = ['department']
search_fields = ['name', 'description']
ordering = ['department', 'name']
@action(detail=False, methods=['get'])
def by_department(self, request):
"""按部门获取项目组"""
department_id = request.query_params.get('department_id')
if department_id:
project_groups = ProjectGroup.objects.filter(department_id=department_id)
serializer = self.get_serializer(project_groups, many=True)
return Response(serializer.data)
return Response([])
def destroy(self, request, *args, **kwargs):
"""删除项目组,发送邮件通知"""
from email_notice.services import EmailNotificationService
import threading
# 获取要删除的项目组信息
project_group = self.get_object()
project_group_data = {
'id': project_group.id,
'name': project_group.name,
'department_name': project_group.department.name if project_group.department else '',
'description': project_group.description,
'created_at': str(project_group.created_at),
'operation_path': request.path,
'operation_method': request.method
}
# 获取用户信息
user_info = self._get_user_info(request)
# 执行删除操作
response = super().destroy(request, *args, **kwargs)
# 异步发送删除通知邮件
def send_delete_notification():
try:
EmailNotificationService.send_operation_notification(
'DELETE', '项目组', project_group_data, user_info
)
except Exception as e:
logger.error(f"发送项目组删除通知邮件失败: {e}")
thread = threading.Thread(target=send_delete_notification)
thread.daemon = True
thread.start()
return response
def _get_user_info(self, request):
"""获取用户信息"""
try:
if hasattr(request, 'user') and request.user.is_authenticated:
return f"{request.user.username} ({request.user.email})"
else:
return f"匿名用户 (IP: {self._get_client_ip(request)})"
except:
return "未知用户"
def _get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
@action(detail=False, methods=['get'])
def by_department(self, request):
"""按部门获取项目组"""
department_id = request.query_params.get('department_id')
if department_id:
project_groups = ProjectGroup.objects.filter(department_id=department_id)
serializer = self.get_serializer(project_groups, many=True)
return Response(serializer.data)
return Response([])
Binary file not shown.
+10
View File
@@ -0,0 +1,10 @@
from django.apps import AppConfig
class SchedulerConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'scheduler'
def ready(self):
from . import jobs
jobs.start_scheduler()
+10
View File
@@ -0,0 +1,10 @@
from django.apps import AppConfig
class SchedulerConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'scheduler'
def ready(self):
from . import jobs
jobs.start_scheduler()
+66
View File
@@ -0,0 +1,66 @@
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import util
from personnel.models import Personnel
logger = logging.getLogger(__name__)
def check_expired_personnel():
"""检查并更新已到期的人员状态"""
try:
updated_count, updated_names = Personnel.check_and_update_expired_personnel()
if updated_count > 0:
logger.info(f"定时任务执行成功:将 {updated_count} 名人员设置为已卸任状态")
logger.info(f"更新的人员:{', '.join(updated_names)}")
else:
logger.info("定时任务执行成功:没有需要更新的人员")
except Exception as e:
logger.error(f"定时任务执行失败:{str(e)}")
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):
"""删除旧的任务执行记录(默认保留7天)"""
DjangoJobExecution.objects.delete_old_job_executions(max_age)
def start_scheduler():
"""启动定时任务调度器"""
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 添加人员到期检测任务 - 每天早上8点执行
scheduler.add_job(
check_expired_personnel,
trigger="cron",
hour=8,
minute=0,
id="check_expired_personnel",
max_instances=1,
replace_existing=True,
)
logger.info("已添加人员到期检测定时任务:每天早上8:00执行")
# 添加清理旧任务记录的任务 - 每周执行一次
scheduler.add_job(
delete_old_job_executions,
trigger="cron",
day_of_week="mon",
hour=1,
minute=0,
id="delete_old_job_executions",
max_instances=1,
replace_existing=True,
)
logger.info("已添加清理旧任务记录任务:每周一凌晨1:00执行")
try:
logger.info("正在启动定时任务调度器...")
scheduler.start()
logger.info("定时任务调度器启动成功")
except KeyboardInterrupt:
logger.info("正在停止定时任务调度器...")
scheduler.shutdown()
logger.info("定时任务调度器已停止")