Ingestion Pipeline
An IngestionPipeline uses a concept of Transformations that are applied to input data.
These Transformations are applied to your input data, and the resulting nodes are either returned or inserted into a vector database (if given).
Installation
Section titled “Installation”npm add @vectorstores/core @vectorstores/qdrant openainpm add @vectorstores/core @vectorstores/qdrant openainpm add @vectorstores/core @vectorstores/qdrant openainpm add @vectorstores/core @vectorstores/qdrant openaiUsage Pattern
Section titled “Usage Pattern”The simplest usage is to instantiate an IngestionPipeline like so:
import fs from "node:fs/promises";import { BaseEmbedding, Document, IngestionPipeline, MetadataMode, SentenceSplitter, Settings, VectorStoreIndex,} from "@vectorstores/core";import { OpenAI } from "openai";
async function main() { // Configure embeddings function for BaseEmbedding() const openai = new OpenAI(); Settings.embedFunc = async (input: string[]): Promise<number[][]> => { const { data } = await openai.embeddings.create({ model: "text-embedding-3-small", input, }); return data.map((d) => d.embedding); };
// Load essay from abramov.txt in Node const filePath = fileURLToPath( new URL("./data/abramov.txt", import.meta.url), ); const essay = await fs.readFile(filePath, "utf-8");
// Create Document object with essay const document = new Document({ text: essay, id_: filePath }); const pipeline = new IngestionPipeline({ transformations: [ new SentenceSplitter({ chunkSize: 1024, chunkOverlap: 20 }), new BaseEmbedding(), ], });
console.time("Pipeline Run Time");
// Run the pipeline const nodes = await pipeline.run({ documents: [document] });
console.timeEnd("Pipeline Run Time");
// Initialize the VectorStoreIndex from nodes const index = await VectorStoreIndex.init({ nodes });
// Retrieve from the index const retriever = index.asRetriever();
const response = await retriever.retrieve({ query: "What did the author do in college?", });
// Output response for (const result of response) { console.log("Score:", result.score ?? "-"); console.log(result.node.getContent(MetadataMode.NONE)); console.log("---"); }}
main().catch(console.error);Connecting to Vector Databases
Section titled “Connecting to Vector Databases”When running an ingestion pipeline, you can also chose to automatically insert the resulting nodes into a remote vector store.
Then, you can construct an index from that vector store later on.
import fs from "node:fs/promises";
import { QdrantVectorStore } from "@vectorstores/qdrant";import { BaseEmbedding, Document, IngestionPipeline, SentenceSplitter, Settings, VectorStoreIndex,} from "@vectorstores/core";import { OpenAI } from "openai";
async function main() { // Configure embeddings for BaseEmbedding() const openai = new OpenAI(); Settings.embedFunc = async (input: string[]): Promise<number[][]> => { const { data } = await openai.embeddings.create({ model: "text-embedding-3-small", input, }); return data.map((d) => d.embedding); };
// Load essay from abramov.txt in Node const filePath = fileURLToPath( new URL("./data/abramov.txt", import.meta.url), ); const essay = await fs.readFile(filePath, "utf-8");
const vectorStore = new QdrantVectorStore({ host: "http://localhost:6333", });
// Create Document object with essay const document = new Document({ text: essay, id_: filePath }); const pipeline = new IngestionPipeline({ transformations: [ new SentenceSplitter({ chunkSize: 1024, chunkOverlap: 20 }), new BaseEmbedding(), ], vectorStore, });
// run the pipeline await pipeline.run({ documents: [document] });
// create an index for retrieval later on const index = VectorStoreIndex.fromVectorStore(vectorStore);}
main().catch(console.error);