这里也是因为最近接了一个Django migration优化的任务所以想到把调研的结果写一写以免以后忘了
现象
- Django默认支持单个DML操作的回滚,但不会将同一个文件中的多个DML操作视为一个事务
- migration文件中经常使用migrations.RunPython命令进行metadata的增删改(极少情况下也会有migrations.RunSQL的调用)
- 一个migration文件中经常会有多次的RunPython调用,一旦某部分执行失败,之前已经运行成功的RunPython调用不会被回滚,例如:
def func_a(apps, schema_editor):
do_db_cal()
def func_b(apps, schema_editor):
do_db_cal()
class Migration(migrations.Migration):
dependencies = [
'...',
'...'
]
operations = [
migrations.RunPython(func_a),
migrations.RunPython(func_b),
]
如果上述代码中的func_a调用成功,func_b调用报错,则func_a中的操作是不会回滚的,这样可能导致func_a的操作再次执行
Django migration源码分析
既然想要实现针对一整个migration文件的事务支持,首先第一步就是要来看看migration操作具体是如何实现的了
我们都知道,Django中的migrate命令是python manage.py migrate,所以寻找migrate命令的源代码就是我们要做的第一步
class Command(BaseCommand):
...
def handle(self, *args, **options):
...
if not plan:
...
else:
fake = options.get("fake")
fake_initial = options.get("fake_initial")
executor.migrate(targets, plan, fake=fake, fake_initial=fake_initial)
...
这里列出了和migration操作直接相关的部分代码(migrate命令代码实在太多),当我们在执行python manage.py migrate命令时,如果参数正确,环境正确,实际上我们执行的就是executor.migrate命令来进行正常的migration操作,所以这里我们继续往下分析executor对象的migrate方法
class MigrationExecutor(object):
...
def migrate(self, targets, plan=None, fake=False, fake_initial=False):
"""
Migrates the database up to the given targets.
Django first needs to create all project states before a migration is
(un)applied and in a second step run all the database operations.
"""
if plan is None:
plan = self.migration_plan(targets)
# Create the forwards plan Django would follow on an empty database
full_plan = self.migration_plan(self.loader.graph.leaf_nodes(), clean_start=True)
all_forwards = all(not backwards for mig, backwards in plan)
all_backwards = all(backwards for mig, backwards in plan)
if not plan:
pass # Nothing to do for an empty plan
elif all_forwards == all_backwards:
# This should only happen if there's a mixed plan
raise InvalidMigrationPlan(
"Migration plans with both forwards and backwards migrations "
"are not supported. Please split your migration process into "
"separate plans of only forwards OR backwards migrations.",
plan
)
elif all_forwards:
self._migrate_all_forwards(plan, full_plan, fake=fake, fake_initial=fake_initial)
else:
# No need to check for `elif all_backwards` here, as that condition
# would always evaluate to true.
self._migrate_all_backwards(plan, full_plan, fake=fake)
self.check_replacements()
...
在上面的migrate方法中,根据注释的提示我们可以很轻松的找到我们在正常的migrate DDL或者DML操作中实际上执行的是self._migrate_all_forwards这个方法,继续往下看
class MigrationExecutor(object):
...
def _migrate_all_forwards(self, plan, full_plan, fake, fake_initial):
"""
Take a list of 2-tuples of the form (migration instance, False) and
apply them in the order they occur in the full_plan.
"""
migrations_to_run = {m[0] for m in plan}
state = ProjectState(real_apps=list(self.loader.unmigrated_apps))
for migration, _ in full_plan:
if not migrations_to_run:
# We remove every migration that we applied from this set so
# that we can bail out once the last migration has been applied
# and don't always run until the very end of the migration
# process.
break
if migration in migrations_to_run:
if 'apps' not in state.__dict__:
if self.progress_callback:
self.progress_callback("render_start")
state.apps # Render all -- performance critical
if self.progress_callback:
self.progress_callback("render_success")
state = self.apply_migration(state, migration, fake=fake, fake_initial=fake_initial)
migrations_to_run.remove(migration)
else:
migration.mutate_state(state, preserve=False)
def apply_migration(self, state, migration, fake=False, fake_initial=False):
"""
Runs a migration forwards.
"""
if self.progress_callback:
self.progress_callback("apply_start", migration, fake)
if not fake:
if fake_initial:
# Test to see if this is an already-applied initial migration
applied, state = self.detect_soft_applied(state, migration)
if applied:
fake = True
if not fake:
# Alright, do it normally
with self.connection.schema_editor() as schema_editor:
state = migration.apply(state, schema_editor)
# For replacement migrations, record individual statuses
if migration.replaces:
for app_label, name in migration.replaces:
self.recorder.record_applied(app_label, name)
else:
self.recorder.record_applied(migration.app_label, migration.name)
# Report progress
if self.progress_callback:
self.progress_callback("apply_success", migration, fake)
return state
...
这里最后起作用的就是
with self.connection.schema_editor() as schema_editor:
state = migration.apply(state, schema_editor)
这部分代码,其实到这里,稍微看过一点Django源码的同学应该能知道apply方法实际上就是每一个migration文件中的Migration类的一个方法了(没有分析经验的同学可能还得继续从这一步往上钻,不过这里就不再写更详细的分析过程了,实在太长...)
分析结果
所以最后我们要做的事就是分析apply方法,然后判断apply方法本身是否有办法或者支持实现事务,以及如何实现事务