generate new migration / fix migration files

This commit is contained in:
CREDO23 2025-07-24 11:37:36 +02:00
parent 90bfec6e7d
commit b2a19af1f7
12 changed files with 366 additions and 157 deletions

View file

@ -20,47 +20,97 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema - add LiteLLMProvider enum, LLMConfig table and user LLM preferences."""
# Check if enum type exists and create if it doesn't
# Create enum only if not exists
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'litellmprovider') THEN
CREATE TYPE litellmprovider AS ENUM ('OPENAI', 'ANTHROPIC', 'GROQ', 'COHERE', 'HUGGINGFACE', 'AZURE_OPENAI', 'GOOGLE', 'AWS_BEDROCK', 'OLLAMA', 'MISTRAL', 'TOGETHER_AI', 'REPLICATE', 'PALM', 'VERTEX_AI', 'ANYSCALE', 'PERPLEXITY', 'DEEPINFRA', 'AI21', 'NLPCLOUD', 'ALEPH_ALPHA', 'PETALS', 'CUSTOM');
CREATE TYPE litellmprovider AS ENUM (
'OPENAI', 'ANTHROPIC', 'GROQ', 'COHERE', 'HUGGINGFACE',
'AZURE_OPENAI', 'GOOGLE', 'AWS_BEDROCK', 'OLLAMA', 'MISTRAL',
'TOGETHER_AI', 'REPLICATE', 'PALM', 'VERTEX_AI', 'ANYSCALE',
'PERPLEXITY', 'DEEPINFRA', 'AI21', 'NLPCLOUD', 'ALEPH_ALPHA',
'PETALS', 'CUSTOM'
);
END IF;
END$$;
""")
# Create llm_configs table using raw SQL to avoid enum creation conflicts
# Create llm_configs table only if it doesn't already exist
op.execute("""
CREATE TABLE llm_configs (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
name VARCHAR(100) NOT NULL,
provider litellmprovider NOT NULL,
custom_provider VARCHAR(100),
model_name VARCHAR(100) NOT NULL,
api_key TEXT NOT NULL,
api_base VARCHAR(500),
litellm_params JSONB,
user_id UUID NOT NULL REFERENCES "user"(id) ON DELETE CASCADE
)
DO $$
BEGIN
IF NOT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'llm_configs'
) THEN
CREATE TABLE llm_configs (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
name VARCHAR(100) NOT NULL,
provider litellmprovider NOT NULL,
custom_provider VARCHAR(100),
model_name VARCHAR(100) NOT NULL,
api_key TEXT NOT NULL,
api_base VARCHAR(500),
litellm_params JSONB,
user_id UUID NOT NULL REFERENCES "user"(id) ON DELETE CASCADE
);
END IF;
END$$;
""")
# Create indexes
op.create_index(op.f('ix_llm_configs_id'), 'llm_configs', ['id'], unique=False)
op.create_index(op.f('ix_llm_configs_created_at'), 'llm_configs', ['created_at'], unique=False)
op.create_index(op.f('ix_llm_configs_name'), 'llm_configs', ['name'], unique=False)
# Add LLM preference columns to user table
op.add_column('user', sa.Column('long_context_llm_id', sa.Integer(), nullable=True))
op.add_column('user', sa.Column('fast_llm_id', sa.Integer(), nullable=True))
op.add_column('user', sa.Column('strategic_llm_id', sa.Integer(), nullable=True))
# Create foreign key constraints for LLM preferences
op.create_foreign_key(op.f('fk_user_long_context_llm_id_llm_configs'), 'user', 'llm_configs', ['long_context_llm_id'], ['id'], ondelete='SET NULL')
op.create_foreign_key(op.f('fk_user_fast_llm_id_llm_configs'), 'user', 'llm_configs', ['fast_llm_id'], ['id'], ondelete='SET NULL')
op.create_foreign_key(op.f('fk_user_strategic_llm_id_llm_configs'), 'user', 'llm_configs', ['strategic_llm_id'], ['id'], ondelete='SET NULL')
# Create indexes if they don't exist
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_indexes
WHERE tablename = 'llm_configs' AND indexname = 'ix_llm_configs_id'
) THEN
CREATE INDEX ix_llm_configs_id ON llm_configs(id);
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_indexes
WHERE tablename = 'llm_configs' AND indexname = 'ix_llm_configs_created_at'
) THEN
CREATE INDEX ix_llm_configs_created_at ON llm_configs(created_at);
END IF;
IF NOT EXISTS (
SELECT 1 FROM pg_indexes
WHERE tablename = 'llm_configs' AND indexname = 'ix_llm_configs_name'
) THEN
CREATE INDEX ix_llm_configs_name ON llm_configs(name);
END IF;
END$$;
""")
# Safely add columns to user table
bind = op.get_bind()
inspector = sa.inspect(bind)
existing_columns = [col["name"] for col in inspector.get_columns("user")]
with op.batch_alter_table('user') as batch_op:
if 'long_context_llm_id' not in existing_columns:
batch_op.add_column(sa.Column('long_context_llm_id', sa.Integer(), nullable=True))
batch_op.create_foreign_key(op.f('fk_user_long_context_llm_id_llm_configs'),
'llm_configs', ['long_context_llm_id'], ['id'],
ondelete='SET NULL')
if 'fast_llm_id' not in existing_columns:
batch_op.add_column(sa.Column('fast_llm_id', sa.Integer(), nullable=True))
batch_op.create_foreign_key(op.f('fk_user_fast_llm_id_llm_configs'),
'llm_configs', ['fast_llm_id'], ['id'],
ondelete='SET NULL')
if 'strategic_llm_id' not in existing_columns:
batch_op.add_column(sa.Column('strategic_llm_id', sa.Integer(), nullable=True))
batch_op.create_foreign_key(op.f('fk_user_strategic_llm_id_llm_configs'),
'llm_configs', ['strategic_llm_id'], ['id'],
ondelete='SET NULL')
def downgrade() -> None:

View file

@ -8,8 +8,8 @@ from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
revision: str = "12"
@ -20,52 +20,72 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema - add LogLevel and LogStatus enums and logs table."""
# Create LogLevel enum
# Create LogLevel enum if it doesn't exist
op.execute("""
CREATE TYPE loglevel AS ENUM ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'loglevel') THEN
CREATE TYPE loglevel AS ENUM ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL');
END IF;
END$$;
""")
# Create LogStatus enum
# Create LogStatus enum if it doesn't exist
op.execute("""
CREATE TYPE logstatus AS ENUM ('IN_PROGRESS', 'SUCCESS', 'FAILED')
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'logstatus') THEN
CREATE TYPE logstatus AS ENUM ('IN_PROGRESS', 'SUCCESS', 'FAILED');
END IF;
END$$;
""")
# Create logs table
# Create logs table if it doesn't exist
op.execute("""
CREATE TABLE logs (
CREATE TABLE IF NOT EXISTS logs (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
level loglevel NOT NULL,
status logstatus NOT NULL,
message TEXT NOT NULL,
source VARCHAR(200),
log_metadata JSONB DEFAULT '{}',
search_space_id INTEGER NOT NULL REFERENCES searchspaces(id) ON DELETE CASCADE
)
);
""")
# Create indexes
op.create_index(op.f('ix_logs_id'), 'logs', ['id'], unique=False)
op.create_index(op.f('ix_logs_created_at'), 'logs', ['created_at'], unique=False)
op.create_index(op.f('ix_logs_level'), 'logs', ['level'], unique=False)
op.create_index(op.f('ix_logs_status'), 'logs', ['status'], unique=False)
op.create_index(op.f('ix_logs_source'), 'logs', ['source'], unique=False)
# Get existing indexes
conn = op.get_bind()
inspector = inspect(conn)
existing_indexes = [idx['name'] for idx in inspector.get_indexes('logs')]
# Create indexes only if they don't already exist
if 'ix_logs_id' not in existing_indexes:
op.create_index('ix_logs_id', 'logs', ['id'])
if 'ix_logs_created_at' not in existing_indexes:
op.create_index('ix_logs_created_at', 'logs', ['created_at'])
if 'ix_logs_level' not in existing_indexes:
op.create_index('ix_logs_level', 'logs', ['level'])
if 'ix_logs_status' not in existing_indexes:
op.create_index('ix_logs_status', 'logs', ['status'])
if 'ix_logs_source' not in existing_indexes:
op.create_index('ix_logs_source', 'logs', ['source'])
def downgrade() -> None:
"""Downgrade schema - remove logs table and enums."""
# Drop indexes
op.drop_index(op.f('ix_logs_source'), table_name='logs')
op.drop_index(op.f('ix_logs_status'), table_name='logs')
op.drop_index(op.f('ix_logs_level'), table_name='logs')
op.drop_index(op.f('ix_logs_created_at'), table_name='logs')
op.drop_index(op.f('ix_logs_id'), table_name='logs')
op.drop_index('ix_logs_source', table_name='logs')
op.drop_index('ix_logs_status', table_name='logs')
op.drop_index('ix_logs_level', table_name='logs')
op.drop_index('ix_logs_created_at', table_name='logs')
op.drop_index('ix_logs_id', table_name='logs')
# Drop logs table
op.drop_table('logs')
# Drop enums
op.execute("DROP TYPE IF EXISTS logstatus")
op.execute("DROP TYPE IF EXISTS loglevel")
op.execute("DROP TYPE IF EXISTS loglevel")

View file

@ -0,0 +1,57 @@
"""Add JIRA_CONNECTOR to enums
Revision ID: 13
Revises: 12
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '13'
down_revision: Union[str, None] = '12'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Safely add 'JIRA_CONNECTOR' to enum types if missing."""
# Add to searchsourceconnectortype enum
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'searchsourceconnectortype' AND e.enumlabel = 'JIRA_CONNECTOR'
) THEN
ALTER TYPE searchsourceconnectortype ADD VALUE 'JIRA_CONNECTOR';
END IF;
END
$$;
""")
# Add to documenttype enum
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typname = 'documenttype' AND e.enumlabel = 'JIRA_CONNECTOR'
) THEN
ALTER TYPE documenttype ADD VALUE 'JIRA_CONNECTOR';
END IF;
END
$$;
""")
def downgrade() -> None:
"""
Downgrade logic not implemented since PostgreSQL
does not support removing enum values.
"""
pass

View file

@ -24,7 +24,22 @@ def upgrade() -> None:
# Manually add the command to add the enum value
# Note: It's generally better to let autogenerate handle this, but we're bypassing it
op.execute("ALTER TYPE searchsourceconnectortype ADD VALUE 'GITHUB_CONNECTOR'")
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_enum
WHERE enumlabel = 'GITHUB_CONNECTOR'
AND enumtypid = (
SELECT oid FROM pg_type WHERE typname = 'searchsourceconnectortype'
)
) THEN
ALTER TYPE searchsourceconnectortype ADD VALUE 'GITHUB_CONNECTOR';
END IF;
END$$;
""")
# Pass for the rest, as autogenerate didn't run to add other schema details
pass

View file

@ -18,14 +18,21 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# Manually add the command to add the enum value
op.execute("ALTER TYPE searchsourceconnectortype ADD VALUE 'LINEAR_CONNECTOR'")
# Pass for the rest, as autogenerate didn't run to add other schema details
pass
# ### end Alembic commands ###
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = 'LINEAR_CONNECTOR'
AND enumtypid = (
SELECT oid FROM pg_type WHERE typname = 'searchsourceconnectortype'
)
) THEN
ALTER TYPE searchsourceconnectortype ADD VALUE 'LINEAR_CONNECTOR';
END IF;
END$$;
""")
#
def downgrade() -> None:

View file

@ -22,7 +22,21 @@ NEW_VALUE = 'LINEAR_CONNECTOR'
def upgrade() -> None:
"""Upgrade schema."""
op.execute(f"ALTER TYPE {ENUM_NAME} ADD VALUE '{NEW_VALUE}'")
op.execute(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{NEW_VALUE}'
AND enumtypid = (
SELECT oid FROM pg_type WHERE typname = '{ENUM_NAME}'
)
) THEN
ALTER TYPE {ENUM_NAME} ADD VALUE '{NEW_VALUE}';
END IF;
END$$;
""")
# Warning: This will delete all rows with the new value

View file

@ -18,14 +18,24 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# Manually add the command to add the enum value
op.execute("ALTER TYPE searchsourceconnectortype ADD VALUE 'LINKUP_API'")
# Pass for the rest, as autogenerate didn't run to add other schema details
pass
# ### end Alembic commands ###
ENUM_NAME = 'searchsourceconnectortype'
NEW_VALUE = 'LINKUP_API'
op.execute(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{NEW_VALUE}'
AND enumtypid = (
SELECT oid FROM pg_type WHERE typname = '{ENUM_NAME}'
)
) THEN
ALTER TYPE {ENUM_NAME} ADD VALUE '{NEW_VALUE}';
END IF;
END$$;
""")
def downgrade() -> None:

View file

@ -9,6 +9,7 @@ from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
@ -17,18 +18,25 @@ down_revision: Union[str, None] = '5'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Drop the old column and create a new one with the new name and type
# We need to do this because PostgreSQL doesn't support direct column renames with type changes
op.add_column('podcasts', sa.Column('podcast_transcript', JSON, nullable=False, server_default='{}'))
# Copy data from old column to new column
# Convert text to JSON by storing it as a JSON string value
op.execute("UPDATE podcasts SET podcast_transcript = jsonb_build_object('text', podcast_content) WHERE podcast_content != ''")
# Drop the old column
op.drop_column('podcasts', 'podcast_content')
bind = op.get_bind()
inspector = inspect(bind)
columns = [col["name"] for col in inspector.get_columns("podcasts")]
if "podcast_transcript" not in columns:
op.add_column('podcasts', sa.Column('podcast_transcript', JSON, nullable=False, server_default='{}'))
# Copy data from old column to new column
op.execute("""
UPDATE podcasts
SET podcast_transcript = jsonb_build_object('text', podcast_content)
WHERE podcast_content != ''
""")
# Drop the old column only if it exists
if "podcast_content" in columns:
op.drop_column('podcasts', 'podcast_content')
def downgrade() -> None:

View file

@ -8,6 +8,7 @@ from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
@ -16,10 +17,16 @@ down_revision: Union[str, None] = '6'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Drop the is_generated column
op.drop_column('podcasts', 'is_generated')
# Get the current database connection
bind = op.get_bind()
inspector = inspect(bind)
# Check if the column exists before attempting to drop it
columns = [col["name"] for col in inspector.get_columns("podcasts")]
if "is_generated" in columns:
op.drop_column('podcasts', 'is_generated')
def downgrade() -> None:

View file

@ -4,9 +4,9 @@ Revision ID: 8
Revises: 7
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
@ -17,40 +17,40 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add content_hash column as nullable first to handle existing data
op.add_column('documents', sa.Column('content_hash', sa.String(), nullable=True))
# Update existing documents to generate content hashes
# Using SHA-256 hash of the content column with proper UTF-8 encoding
op.execute("""
UPDATE documents
SET content_hash = encode(sha256(convert_to(content, 'UTF8')), 'hex')
WHERE content_hash IS NULL
""")
# Handle duplicate content hashes by keeping only the oldest document for each hash
# Delete newer documents with duplicate content hashes
op.execute("""
DELETE FROM documents
WHERE id NOT IN (
SELECT MIN(id)
FROM documents
GROUP BY content_hash
)
""")
# Now alter the column to match the model: nullable=False, index=True, unique=True
op.alter_column('documents', 'content_hash',
existing_type=sa.String(),
nullable=False)
op.create_index(op.f('ix_documents_content_hash'), 'documents', ['content_hash'], unique=False)
op.create_unique_constraint(op.f('uq_documents_content_hash'), 'documents', ['content_hash'])
bind = op.get_bind()
inspector = inspect(bind)
columns = [col['name'] for col in inspector.get_columns('documents')]
# Only add the column if it doesn't already exist
if 'content_hash' not in columns:
op.add_column('documents', sa.Column('content_hash', sa.String(), nullable=True))
# Populate the content_hash column
op.execute("""
UPDATE documents
SET content_hash = encode(sha256(convert_to(content, 'UTF8')), 'hex')
WHERE content_hash IS NULL
""")
op.execute("""
DELETE FROM documents
WHERE id NOT IN (
SELECT MIN(id)
FROM documents
GROUP BY content_hash
)
""")
op.alter_column('documents', 'content_hash',
existing_type=sa.String(),
nullable=False)
op.create_index(op.f('ix_documents_content_hash'), 'documents', ['content_hash'], unique=False)
op.create_unique_constraint(op.f('uq_documents_content_hash'), 'documents', ['content_hash'])
else:
print("Column 'content_hash' already exists. Skipping column creation.")
def downgrade() -> None:
# Remove constraints and index first
op.drop_constraint(op.f('uq_documents_content_hash'), 'documents', type_='unique')
op.drop_index(op.f('ix_documents_content_hash'), table_name='documents')
# Remove content_hash column from documents table
op.drop_column('documents', 'content_hash')
op.drop_column('documents', 'content_hash')

View file

@ -24,11 +24,35 @@ DOCUMENT_NEW_VALUE = "DISCORD_CONNECTOR"
def upgrade() -> None:
"""Upgrade schema - add DISCORD_CONNECTOR to connector and document enum."""
# Add DISCORD_CONNECTOR to searchsourceconnectortype
op.execute(f"ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{CONNECTOR_NEW_VALUE}'")
# Add DISCORD_CONNECTOR to documenttype
op.execute(f"ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{DOCUMENT_NEW_VALUE}'")
"""Upgrade schema - add DISCORD_CONNECTOR to connector and document enum safely."""
# Add DISCORD_CONNECTOR to searchsourceconnectortype only if not exists
op.execute(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{CONNECTOR_NEW_VALUE}'
AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{CONNECTOR_ENUM}')
) THEN
ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{CONNECTOR_NEW_VALUE}';
END IF;
END$$;
""")
# Add DISCORD_CONNECTOR to documenttype only if not exists
op.execute(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{DOCUMENT_NEW_VALUE}'
AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{DOCUMENT_ENUM}')
) THEN
ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{DOCUMENT_NEW_VALUE}';
END IF;
END$$;
""")
def downgrade() -> None:

View file

@ -1,15 +1,8 @@
"""Add GITHUB_CONNECTOR to DocumentType enum
Revision ID: e55302644c51
Revises: 1
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'e55302644c51'
down_revision: Union[str, None] = '1'
@ -17,22 +10,30 @@ branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
# Define the ENUM type name and the new value
ENUM_NAME = 'documenttype' # Make sure this matches the name in your DB (usually lowercase class name)
ENUM_NAME = 'documenttype'
NEW_VALUE = 'GITHUB_CONNECTOR'
def upgrade() -> None:
"""Upgrade schema."""
op.execute(f"ALTER TYPE {ENUM_NAME} ADD VALUE '{NEW_VALUE}'")
op.execute(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{NEW_VALUE}'
AND enumtypid = (
SELECT oid FROM pg_type WHERE typname = '{ENUM_NAME}'
)
) THEN
ALTER TYPE {ENUM_NAME} ADD VALUE '{NEW_VALUE}';
END IF;
END$$;
""")
# Warning: This will delete all rows with the new value
def downgrade() -> None:
"""Downgrade schema - remove GITHUB_CONNECTOR from enum."""
# The old type name
old_enum_name = f"{ENUM_NAME}_old"
# Enum values *before* GITHUB_CONNECTOR was added
old_values = (
'EXTENSION',
'CRAWLED_URL',
@ -43,27 +44,23 @@ def downgrade() -> None:
)
old_values_sql = ", ".join([f"'{v}'" for v in old_values])
# Table and column names (adjust if different)
table_name = 'documents'
column_name = 'document_type'
# 1. Rename the current enum type
op.execute(f"ALTER TYPE {ENUM_NAME} RENAME TO {old_enum_name}")
# 1. Create the new enum type with the old values
op.execute(f"CREATE TYPE {old_enum_name} AS ENUM({old_values_sql})")
# 2. Create the new enum type with the old values
op.execute(f"CREATE TYPE {ENUM_NAME} AS ENUM({old_values_sql})")
# 3. Update the table:
# 2. Delete rows using the new value
op.execute(
f"DELETE FROM {table_name} WHERE {column_name}::text = '{NEW_VALUE}'"
)
# 4. Alter the column to use the new enum type (casting old values)
# 3. Alter the column to use the old enum type
op.execute(
f"ALTER TABLE {table_name} ALTER COLUMN {column_name} "
f"TYPE {ENUM_NAME} USING {column_name}::text::{ENUM_NAME}"
f"TYPE {old_enum_name} USING {column_name}::text::{old_enum_name}"
)
# 5. Drop the old enum type
op.execute(f"DROP TYPE {old_enum_name}")
# ### end Alembic commands ###
# 4. Drop the current enum type and rename the old one
op.execute(f"DROP TYPE {ENUM_NAME}")
op.execute(f"ALTER TYPE {old_enum_name} RENAME TO {ENUM_NAME}")