Skip to content

Instantly share code, notes, and snippets.

@kurokobo
Last active December 6, 2025 13:03
Show Gist options
  • Select an option

  • Save kurokobo/51fbe7f92f4526957e12dacfa7783cdf to your computer and use it in GitHub Desktop.

Select an option

Save kurokobo/51fbe7f92f4526957e12dacfa7783cdf to your computer and use it in GitHub Desktop.
Dify: Weaviate 1.19 to 1.27+ Migration Guide (community-edited, simplified)
#!/usr/bin/env python3
"""
Migration script to fix Weaviate schema incompatibility between 1.19.0 and 1.27.0+
This script:
- Identifies collections with old schema (no vectorConfig)
- Creates new collections with proper vectorConfig including "default" named vector
- Migrates data using cursor-based pagination (efficient for large datasets)
- Uses batch operations for fast inserts
- Preserves all object properties and vectors
Note:
- This is a community-edited version of the draft of the script presented by the Dify Team.
- This script is not officially supported by the Dify Team.
- The original source for this script can be found at https://github.com/langgenius/dify/issues/27291#issuecomment-3501003678.
- The changes made in this script are:
- Retrieve Weaviate connection info from environment variables to make this script run in the Worker container.
- Switch to cursor-based pagination in "replace_old_collection", since the migration could fail with large collections.
- Fix an issue where both the old and new collections remained without being deleted after migrating an empty collection.
"""
import os
import weaviate
from weaviate.classes.config import Configure, VectorDistances
import sys
import time
from typing import List, Dict, Any
# Configuration
WEAVIATE_ENDPOINT = os.getenv("WEAVIATE_ENDPOINT", "http://weaviate:8080")
WEAVIATE_GRPC_ENDPOINT = os.getenv("WEAVIATE_GRPC_ENDPOINT", "grpc://weaviate:50051")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY", "WVF5YThaHlkYwhGUSmCRgsX3tD5ngdN8pkih")
BATCH_SIZE = 1000
WEAVIATE_HOST = WEAVIATE_ENDPOINT.split("//")[-1].split(":")[0]
WEAVIATE_PORT = int(WEAVIATE_ENDPOINT.split(":")[-1])
WEAVIATE_GRPC_PORT = int(WEAVIATE_GRPC_ENDPOINT.split(":")[-1])
def identify_old_collections(client: weaviate.WeaviateClient) -> List[str]:
"""Identify collections that need migration (those without vectorConfig)"""
collections_to_migrate = []
all_collections = client.collections.list_all()
print(f"Found {len(all_collections)} total collections")
for collection_name in all_collections.keys():
# Only check Vector_index collections (Dify knowledge bases)
if not collection_name.startswith("Vector_index_"):
continue
collection = client.collections.get(collection_name)
config = collection.config.get()
# Check if this collection has the old schema
if config.vector_config is None:
collections_to_migrate.append(collection_name)
print(f" - {collection_name}: OLD SCHEMA (needs migration)")
else:
print(f" - {collection_name}: NEW SCHEMA (skip)")
return collections_to_migrate
def get_collection_schema(client: weaviate.WeaviateClient, collection_name: str) -> Dict[str, Any]:
"""Get the full schema of a collection via REST API"""
import requests
response = requests.get(
f"http://{WEAVIATE_HOST}:{WEAVIATE_PORT}/v1/schema/{collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to get schema: {response.text}")
def create_new_collection(client: weaviate.WeaviateClient, old_name: str, schema: Dict[str, Any]) -> str:
"""Create a new collection with updated schema using REST API"""
import requests
# Generate new collection name
new_name = f"{old_name}_migrated"
print(f"Creating new collection: {new_name}")
# Build new schema with proper vectorConfig
# Note: When using vectorConfig (named vectors), we don't set class-level vectorizer
new_schema = {
"class": new_name,
# This is the key: define vectorConfig with "default" named vector
# Do NOT set class-level vectorizer when using vectorConfig
"vectorConfig": {
"default": {
"vectorizer": {
"none": {}
},
"vectorIndexType": "hnsw",
"vectorIndexConfig": {
"distance": "cosine",
"ef": -1,
"efConstruction": 128,
"maxConnections": 32
}
}
},
"properties": []
}
# Copy properties from old schema
if "properties" in schema:
new_schema["properties"] = schema["properties"]
# Create collection via REST API
response = requests.post(
f"{WEAVIATE_ENDPOINT}/v1/schema",
json=new_schema,
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code not in [200, 201]:
raise Exception(f"Failed to create collection: {response.text}")
print(f" Created new collection: {new_name}")
return new_name
def migrate_collection_data(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
) -> int:
"""Migrate data from old collection to new collection using cursor-based pagination"""
old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)
total_migrated = 0
cursor = None
print(f"Migrating data from {old_collection_name} to {new_collection_name}")
while True:
# Fetch batch of objects using cursor-based pagination
if cursor is None:
# First batch
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE,
include_vector=True
)
else:
# Subsequent batches using cursor
response = old_collection.query.fetch_objects(
limit=BATCH_SIZE,
include_vector=True,
after=cursor
)
objects = response.objects
if not objects:
break
# Use batch insert for efficiency
with new_collection.batch.dynamic() as batch:
for obj in objects:
# Prepare properties
properties = obj.properties
# Add object with vector
batch.add_object(
properties=properties,
vector=obj.vector["default"] if isinstance(obj.vector, dict) else obj.vector,
uuid=obj.uuid
)
total_migrated += len(objects)
print(f" Migrated {total_migrated} objects...")
# Update cursor for next iteration
if len(objects) < BATCH_SIZE:
# Last batch
break
else:
# Get the last object's UUID for cursor
cursor = objects[-1].uuid
print(f" Total migrated: {total_migrated} objects")
return total_migrated
def verify_migration(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
):
"""Verify that the migration was successful"""
old_collection = client.collections.get(old_collection_name)
new_collection = client.collections.get(new_collection_name)
# Count objects in both collections
old_count_response = old_collection.query.fetch_objects(limit=1)
new_count_response = new_collection.query.fetch_objects(limit=1)
# Get aggregation for accurate counts
old_agg = old_collection.aggregate.over_all(total_count=True)
new_agg = new_collection.aggregate.over_all(total_count=True)
old_count = old_agg.total_count
new_count = new_agg.total_count
print(f"\nVerification:")
print(f" Old collection ({old_collection_name}): {old_count} objects")
print(f" New collection ({new_collection_name}): {new_count} objects")
if old_count == new_count:
print(f" Status: SUCCESS - Counts match!")
return True
else:
print(f" Status: WARNING - Counts don't match!")
return False
def replace_old_collection(
client: weaviate.WeaviateClient,
old_collection_name: str,
new_collection_name: str
):
"""Replace old collection with migrated one by recreating with original name"""
import requests
print(f"\nReplacing old collection with migrated data...")
# Step 1: Delete old collection
print(f" Step 1: Deleting old collection...")
response = requests.delete(
f"{WEAVIATE_ENDPOINT}/v1/schema/{old_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code != 200:
print(f" Warning: Could not delete old collection: {response.text}")
else:
print(f" Deleted")
# Step 2: Get schema from migrated collection
print(f" Step 2: Getting schema from migrated collection...")
schema_response = requests.get(
f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
schema = schema_response.json()
schema["class"] = old_collection_name
# Step 3: Create collection with original name and new schema
print(f" Step 3: Creating collection with original name...")
create_response = requests.post(
f"{WEAVIATE_ENDPOINT}/v1/schema",
json=schema,
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if create_response.status_code not in [200, 201]:
raise Exception(f"Failed to create collection: {create_response.text}")
print(f" Created")
# Step 4: Copy data to collection with original name using cursor-based pagination
print(f" Step 4: Copying data to original collection name...")
migrated_collection = client.collections.get(new_collection_name)
new_collection = client.collections.get(old_collection_name)
total_copied = 0
cursor = None
while True:
# Fetch batch of objects using cursor-based pagination
if cursor is None:
# First batch
response = migrated_collection.query.fetch_objects(
include_vector=True,
limit=BATCH_SIZE
)
else:
# Subsequent batches using cursor
response = migrated_collection.query.fetch_objects(
include_vector=True,
limit=BATCH_SIZE,
after=cursor
)
objects = response.objects
if not objects:
break
# Use batch insert for efficiency
with new_collection.batch.dynamic() as batch:
for obj in objects:
batch.add_object(
properties=obj.properties,
vector=obj.vector,
uuid=obj.uuid
)
total_copied += len(objects)
print(f" Copied {total_copied} objects...")
# Update cursor for next iteration
if len(objects) < BATCH_SIZE:
break
else:
cursor = objects[-1].uuid
print(f" Total copied: {total_copied} objects")
# Step 5: Delete the temporary migrated collection
print(f" Step 5: Cleaning up temporary migrated collection...")
response = requests.delete(
f"{WEAVIATE_ENDPOINT}/v1/schema/{new_collection_name}",
headers={"Authorization": f"Bearer {WEAVIATE_API_KEY}"}
)
if response.status_code == 200:
print(f" Cleaned up")
print(f"\n SUCCESS! {old_collection_name} now has the new schema with {total_copied} objects")
return True
def migrate_all_collections():
"""Main migration function"""
print("=" * 80)
print("Weaviate Collection Migration Script")
print("Migrating from Weaviate 1.19.0 schema to 1.27.0+ schema")
print("=" * 80)
print()
client = weaviate.connect_to_local(
host=WEAVIATE_HOST,
port=WEAVIATE_PORT,
grpc_port=WEAVIATE_GRPC_PORT,
auth_credentials=weaviate.auth.AuthApiKey(WEAVIATE_API_KEY)
)
try:
# Step 1: Identify collections that need migration
print("Step 1: Identifying collections that need migration...")
collections_to_migrate = identify_old_collections(client)
if not collections_to_migrate:
print("\nNo collections need migration. All collections are up to date!")
return
print(f"\nFound {len(collections_to_migrate)} collections to migrate:")
for col in collections_to_migrate:
print(f" - {col}")
# Confirm before proceeding
print("\nThis script will:")
print("1. Create new collections with updated schema")
print("2. Copy all data using efficient batch operations")
print("3. Verify the migration")
print("4. Optionally rename collections to activate the new ones")
print()
# Step 2: Migrate each collection
for collection_name in collections_to_migrate:
print("\n" + "=" * 80)
print(f"Migrating: {collection_name}")
print("=" * 80)
try:
# Get old schema
schema = get_collection_schema(client, collection_name)
# Create new collection
new_collection_name = create_new_collection(client, collection_name, schema)
# Migrate data
migrated_count = migrate_collection_data(client, collection_name, new_collection_name)
# Verify migration
success = verify_migration(client, collection_name, new_collection_name)
if success:
print(f"\nMigration successful for {collection_name}!")
print(f"New collection: {new_collection_name}")
# Automatically replace old collection with migrated one
try:
replace_old_collection(client, collection_name, new_collection_name)
except Exception as e:
print(f"\nWarning: Could not automatically replace collection: {e}")
print(f"\nTo activate manually:")
print(f"1. Delete the old collection: {collection_name}")
print(f"2. Rename {new_collection_name} to {collection_name}")
except Exception as e:
print(f"\nError migrating {collection_name}: {e}")
print(f"Skipping this collection and continuing...")
continue
print("\n" + "=" * 80)
print("Migration Complete!")
print("=" * 80)
print("\nSummary:")
print(f" Collections migrated: {len(collections_to_migrate)}")
finally:
client.close()
if __name__ == "__main__":
try:
migrate_all_collections()
except KeyboardInterrupt:
print("\n\nMigration interrupted by user.")
sys.exit(1)
except Exception as e:
print(f"\n\nFatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

Weaviate 1.19 to 1.27+ Migration Guide for Dify

  • ⚠️ This guide is not officially supported by the Dify Team.
  • ⚠️ This is a community-edited, simplified version of the draft of the official migration guide presented by the Dify Team.
  • ⚠️ The original source for this guide and script can be found at #27291.

Complete guide to safely migrate Dify knowledge bases from Weaviate 1.19 to 1.27/1.33.


Outline

This guide covers the following two cases.
While Case A is recommended for a safer migration, this guide can also be applied to Case B:

  • Case A
    • You are currently running a version of Dify 1.9.1 or earlier with Weaviate 1.19 included.
    • All knowledge is functioning properly.
  • Case B
    • You have already upgraded to Weaviate 1.27+ and are running Dify 1.9.2 or later.
    • The knowledge created with the previous version is corrupted, and you have no backup to revert to the earlier version.

The procedure in this guide is as follows:

  1. Take a complete backup of your current Dify environment.
  2. If your Dify version is 1.9.1 or earlier, upgrade Dify.
  3. Operate the weaviate container and modify the directory structure of the LSM data.
  4. Operate the worker container and run the migration script.
  5. Perform cleanup.

Migration Procedure

Note:
This procedure cannot be rolled back by any means other than a restore. Attempting to roll back using anything other than a restore may make things worse.
We recommend that you follow the steps to take a full backup first, in preparation for a possible restore.


Step 1: Backup Your Environment

Stop your Dify services:

cd /path/to/dify/docker
docker compose down

Then making full copy or archive of your entire docker directory (/path/to/dify/docker for example) as a safety measure.

If you encounter issues later, you can restore this backup to revert to the original state.


Step 2: Upgrade to Weaviate 1.27+ (Only for Case A)

This step is only for Case A - users currently on Dify 1.9.1 or earlier with Weaviate 1.19.
If you are already running Weaviate 1.27+ (Case B), you can skip this step.

Follow the upgrade guide to move to the latest (or a specific) Dify version that uses Weaviate 1.27+.


Step 3: Fix Orphaned LSM Data

If your Dify has stopped, start it and wait until it has fully launched.

cd /path/to/dify/docker
docker compose up -d

Ensure your Weaviate using the image version 1.27.0 or higher.

cd /path/to/dify/docker
docker compose ps weaviate  # The "IMAGE" column should show "semitechnologies/weaviate:1.27.0" or higher

Enter the shell of your weaviatwe container:

cd /path/to/dify/docker
docker compose exec -it weaviate /bin/sh

Then run the following commands inside the container to fix LSM data:

cd /var/lib/weaviate
for dir in vector_index_*_node_*_lsm; do
  [ -d "$dir" ] || continue
  
  # Extract index ID and shard ID
  index_id=$(echo "$dir" | sed -n 's/vector_index_\([^_]*_[^_]*_[^_]*_[^_]*_[^_]*\)_node_.*/\1/p')
  shard_id=$(echo "$dir" | sed -n 's/.*_node_\([^_]*\)_lsm/\1/p')
  
  # Create target directory and copy
  mkdir -p "vector_index_${index_id}_node/$shard_id/lsm"
  cp -a "$dir/"* "vector_index_${index_id}_node/$shard_id/lsm/"
  
  echo "✓ Copied $dir"
done
exit

Then restart weaviate container to ensure changes are recognized:

cd /path/to/dify/docker
docker compose restart weaviate

Step 4: Migrate Schema

Place migrate_weaviate_collections_ce.py script to your /path/to/dify/docker/volumes/app/storage/ directory, then enter the shell of your worker container:

cp /path/to/migrate_weaviate_collections_ce.py /path/to/dify/docker/volumes/app/storage/
cd /path/to/dify/docker
docker compose exec -it worker /bin/bash

Then run the following commands inside the container to execute the migration script:

uv run /app/api/storage/migrate_weaviate_collections_ce.py
exit

Restart Dify services:

docker compose down
docker compose up -d

Verify in Dify UI:

  1. Go to your Dify console
  2. Open your knowledge bases
  3. Try "Retrieval Testing"
  4. Should work without errors!

Step 5: Cleanup (Optional)

After successful migration, you can delete orphaned files to free up space.
Enter the shell of your weaviatwe container:

cd /path/to/dify/docker
docker compose exec -it weaviate /bin/sh

Then run the following commands inside the container to delete orphaned files:

cd /var/lib/weaviate
rm -rf vector_index_*_node_*
exit

Also, you can delete the migration script from your storage volume:

rm /path/to/dify/docker/volumes/app/storage/migrate_weaviate_collections_ce.py

Files Needed

Credits

  • Original migration approach: Dify team
  • LSM recovery method: Chinese Dify community user
  • Combined solution: Community effort
@kurokobo
Copy link
Author

kurokobo commented Dec 4, 2025

@jmjoy @tomy-kyu
I've updated the script. Since a batch size of 10,000 might exceed GRPC_MAX_MESSAGE_SIZE, I've set it to 1,000 just to be safe.

@jmjoy
Copy link

jmjoy commented Dec 4, 2025

Handling replace_old_collection based on pagination feels a bit risky to me, because it requires deleting the old class first. If something goes wrong during the paginated migration process, you’d need ways to recover or retry, which is a real hassle.

The root of all these problems is that Weaviate somehow doesn’t support what should be the most basic database operation: renaming.

@tomy-kyu
Copy link

tomy-kyu commented Dec 4, 2025

@jmjoy

I apologize for presenting an unusual processing example.
Ah, now I understand why we were seeing the following warning message:

Warning: Could not automatically replace collection: Query call with protocol GRPC search failed with the message "grpc: trying to send message larger than max (46033918 vs. 10485760)."

To manually resolve this issue:
1. Delete the old collection: Vector_index_015e0da7_4e44_4715_9127_8142873eea25_Node
2. Rename Vector_index_015e0da7_4e44_4715_9127_8142873eea25_Node to Vector_index_015e0da7_4e44_4715_9127_8142873eea25_Node_migrated

So the solution is to manually rename the collection using OS commands or similar methods?

Based on this, it seems better not to modify GRPC_MAX_MESSAGE_SIZE. Instead, we should intentionally set GRPC_MAX_MESSAGE_SIZE above its maximum value to actively trigger the above warning.

The reason is that if we pass the expanded GRPC_MAX_MESSAGE_SIZE to the next processing stage, the replace_old_collection operation will still apply the limit=10000 constraint, causing any data exceeding this limit to be truncated—potentially leading to the accidental deletion of temporary collections as well.

I believe this important consideration should be noted explicitly in the documentation.

@kurokobo
Copy link
Author

kurokobo commented Dec 4, 2025

@jmjoy @tomy-kyu
To summarize, the script firstly created by the Dify Team migrates each collection in the following two steps (1️⃣ and 2️⃣):

Vector_index_* (Original collection with old schema)
  ↓
  ↓  1️⃣ Migrate all objects in batches of 100 using pagination.
  ↓
Vector_index_*_migrated (New collection with new schema)
  ↓
  ↓  2️⃣ Migrate 10,000 objects all at once
  ↓    🚨 If there are more than 10,000 objects, the extra objects will be truncated.
  ↓    🚨 The GRPC_MAX_MESSAGE_SIZE could be exceeded causing a warning will be triggered and the migration may be interrupted.
  ↓
Vector_index_* (Re-created new collection with new schema)

Based on your comments, I have already modified the script in this gist as follows:

Vector_index_* (Original collection with old schema)
  ↓
  ↓  1️⃣ Migrate all objects in batches of 1,000 using pagination.
  ↓    ✅ Update batch size from 100 to 1,000 for faster migration
  ↓
Vector_index_*_migrated (New collection with new schema)
  ↓
  ↓  2️⃣ Migrate all objects in batches of 1,000 using pagination.
  ↓    ✅ Use pagenation to support large collection
  ↓
Vector_index_* (Re-created new collection with new schema)

Even when using pagination, if the batch size is set to 10,000, there's a possibility of exceeding GRPC_MAX_MESSAGE_SIZE, so in my script, I set it to 1,000.

@jmjoy
Agree with you, but the function replace_old_collection is used for step 2️⃣.
It first deletes the collection, which targets the old collection with the old schema. Even if an error occurs mid-pagination, as long as all the data has been persistently stored in Vector_index*migrated (by step 1️⃣), it should be technically possible to recover by intervening manually (though, to be fair, the script is not automated to handle that).
Also, since the old directory still remains in volmes, you can start the migration over from the beginning if needed.

@tomy-kyu
Copy link

tomy-kyu commented Dec 4, 2025

Ah, so if the write-back process fails and stops, the subsequent migration collection deletion operation won't occur. In that case, you should:

To manually resolve this issue:
1. Delete the old collection: Vector_index_*_Node
2. Rename Vector_index_*_Node to Vector_index_*_Node_migrated

This appears to be the correct approach.

@tomy-kyu
Copy link

tomy-kyu commented Dec 4, 2025

My primary concern was that when running the initial script in an environment where gRPC-related configuration values had been modified, the replace_old_collection operation would truncate most of the larger index data and completely remove the primary collection data, completing the process without error. This would also result in the loss of migrated data, leaving only the option to restore from backup.

While 10,000 objects might seem insignificant in Japanese terms, with only about 416,000 characters of capacity, simply including a few documents would easily exceed this limit. In my environment, this was unacceptable, so I incorporated a similar migration workflow into the replace operation.

In particular, knowledge data often contains numerous documents and may naturally have large index sizes - this example was provided as reference material for such cases.

@tomy-kyu
Copy link

tomy-kyu commented Dec 5, 2025

@kurokobo
Thank you for your hard work on these modifications.
This procedure contains more detailed steps than the original plan, so when actually implementing the work, I plan to refer to this guide as a reference. If this proves to be of any help to you, I would be truly grateful.

One question I have for confirmation: In Step 3, you're starting the DIFY-related containers updated to version 1.9.2. Is this done so that the Worker-side can access the updated Weaviate client when performing operations in Step 4?

I also wondered whether starting the API also triggers schema updates in PostgreSQL, so perhaps you launched all services at once to ensure synchronization of update timing. I'd appreciate it if you could confirm this for me.

@kurokobo
Copy link
Author

kurokobo commented Dec 5, 2025

@tomy-kyu
Thank you for your positive comment.

One question I have for confirmation: In Step 3, you're starting the DIFY-related containers updated to version 1.9.2. Is this done so that the Worker-side can access the updated Weaviate client when performing operations in Step 4?

I also wondered whether starting the API also triggers schema updates in PostgreSQL, so perhaps you launched all services at once to ensure synchronization of update timing. I'd appreciate it if you could confirm this for me.

Your understanding is almost correct.

Technically, PostgreSQL migration is completely independent from Weaviate migration, and there is no dependency between them.
Therefore, when considering the migration steps for Weaviate, there is no need to take the PostgreSQL migration into account.

However, as you mentioned, Weaviate migration requires a new client, which is deployed in the Worker, API, and Beat containers. Of course, it's also possible to set up the Weaviate client on a user's terminal (as the official documentation suggests), but that's not necessarily a recommended approach for everyone.

Given this background, I thought that updating the API, Worker, Beat, and Weaviate all at once would be the most reasonable, simple, and least problematic migration process.

Hope this helps :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment