761 lines
27 KiB
Python
761 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Ollama DeepSeek V2 Lite Client with improved Docker management
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
import subprocess
|
|
import sys
|
|
import socket
|
|
import os
|
|
import glob
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Ollama DeepSeek V2 Lite Client - Fixed Version
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
import subprocess
|
|
import sys
|
|
import socket
|
|
|
|
class OllamaDeepSeekClient:
|
|
def __init__(self, base_url="http://localhost:11434"):
|
|
self.base_url = base_url
|
|
self.model = "deepseek-coder:6.7b"
|
|
self.container_name = "ollama-deepseek"
|
|
|
|
def is_port_in_use(self, port):
|
|
"""Check if a port is already in use"""
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
return s.connect_ex(('localhost', port)) == 0
|
|
|
|
def check_ollama_running(self):
|
|
"""Check if Ollama is running and accessible"""
|
|
try:
|
|
response = requests.get(f"{self.base_url}/api/tags", timeout=10)
|
|
return response.status_code == 200
|
|
except requests.exceptions.ConnectionError:
|
|
return False
|
|
|
|
def check_container_exists(self):
|
|
"""Check if the container exists (running or stopped)"""
|
|
try:
|
|
result = subprocess.run([
|
|
"docker", "ps", "-a", "--filter", f"name={self.container_name}",
|
|
"--format", "{{.Names}}"
|
|
], capture_output=True, text=True, check=True)
|
|
return self.container_name in result.stdout
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
def check_container_running(self):
|
|
"""Check if the container is running"""
|
|
try:
|
|
result = subprocess.run([
|
|
"docker", "ps", "--filter", f"name={self.container_name}",
|
|
"--format", "{{.Names}}"
|
|
], capture_output=True, text=True, check=True)
|
|
return self.container_name in result.stdout
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
def setup_ollama(self):
|
|
"""Set up Ollama container and pull model"""
|
|
print("Setting up Ollama Docker container...")
|
|
|
|
# Check if port is in use
|
|
if self.is_port_in_use(11434):
|
|
print("❌ Port 11434 is in use. Checking if it's our container...")
|
|
if not self.check_container_running():
|
|
print("Another process is using port 11434. Please free the port or use a different one.")
|
|
return False
|
|
|
|
try:
|
|
# Remove any existing container
|
|
subprocess.run([
|
|
"docker", "rm", "-f", self.container_name
|
|
], capture_output=True, stderr=subprocess.DEVNULL)
|
|
|
|
# Create data directory
|
|
subprocess.run([
|
|
"mkdir", "-p", "~/ollama_data"
|
|
], check=True)
|
|
|
|
# Run new container
|
|
print("🚀 Starting Ollama container...")
|
|
subprocess.run([
|
|
"docker", "run", "-d",
|
|
"--name", self.container_name,
|
|
"-p", "11434:11434",
|
|
"-v", "~/ollama_data:/root/.ollama",
|
|
"--restart", "unless-stopped",
|
|
"ollama/ollama"
|
|
], check=True)
|
|
|
|
print("⏳ Waiting for Ollama to initialize...")
|
|
for i in range(30): # Wait up to 30 seconds
|
|
time.sleep(1)
|
|
try:
|
|
response = requests.get(f"{self.base_url}/api/tags", timeout=5)
|
|
if response.status_code == 200:
|
|
break
|
|
except:
|
|
if i == 29:
|
|
print("❌ Ollama failed to start in time")
|
|
return False
|
|
continue
|
|
|
|
print("📥 Pulling DeepSeek model (this may take a few minutes)...")
|
|
result = subprocess.run([
|
|
"docker", "exec", self.container_name,
|
|
"ollama", "pull", self.model
|
|
], capture_output=True, text=True)
|
|
|
|
if result.returncode == 0:
|
|
print("✅ Setup completed successfully!")
|
|
return True
|
|
else:
|
|
print(f"❌ Failed to pull model: {result.stderr}")
|
|
return False
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"❌ Docker command failed: {e}")
|
|
return False
|
|
|
|
def start_ollama(self):
|
|
"""Start the existing Ollama container"""
|
|
try:
|
|
print("🔧 Starting existing container...")
|
|
subprocess.run([
|
|
"docker", "start", self.container_name
|
|
], check=True)
|
|
|
|
print("⏳ Waiting for Ollama to be ready...")
|
|
for i in range(15): # Wait up to 15 seconds
|
|
time.sleep(1)
|
|
if self.check_ollama_running():
|
|
print("✅ Ollama is ready!")
|
|
return True
|
|
|
|
print("❌ Ollama didn't become ready in time")
|
|
return False
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"❌ Failed to start container: {e}")
|
|
return False
|
|
|
|
def get_system_info(self):
|
|
"""Get system and container information"""
|
|
info = {
|
|
'port_11434_used': self.is_port_in_use(11434),
|
|
'container_exists': self.check_container_exists(),
|
|
'container_running': self.check_container_running(),
|
|
'ollama_accessible': self.check_ollama_running()
|
|
}
|
|
return info
|
|
|
|
def list_models(self):
|
|
"""List available models"""
|
|
try:
|
|
response = requests.get(f"{self.base_url}/api/tags")
|
|
if response.status_code == 200:
|
|
models = response.json()
|
|
print("\n📚 Available models:")
|
|
for model in models.get('models', []):
|
|
print(f" - {model['name']}")
|
|
return True
|
|
else:
|
|
print("❌ Failed to list models")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Error listing models: {e}")
|
|
return False
|
|
|
|
def generate_response(self, prompt, stream=False, **kwargs):
|
|
"""Generate response from DeepSeek model"""
|
|
if not self.check_ollama_running():
|
|
print("❌ Ollama is not running")
|
|
return None
|
|
|
|
url = f"{self.base_url}/api/generate"
|
|
|
|
payload = {
|
|
"model": self.model,
|
|
"prompt": prompt,
|
|
"stream": stream,
|
|
"options": {
|
|
"temperature": kwargs.get('temperature', 0.7),
|
|
"top_p": kwargs.get('top_p', 0.9),
|
|
"top_k": kwargs.get('top_k', 40),
|
|
"num_predict": kwargs.get('max_tokens', 2048)
|
|
}
|
|
}
|
|
|
|
try:
|
|
if stream:
|
|
return self._stream_response(url, payload)
|
|
else:
|
|
return self._generate_complete(url, payload)
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Error generating response: {e}")
|
|
return None
|
|
|
|
def _generate_complete(self, url, payload):
|
|
"""Generate complete response"""
|
|
response = requests.post(url, json=payload, timeout=120)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
return result.get('response', '')
|
|
else:
|
|
print(f"❌ API Error: {response.status_code}")
|
|
return None
|
|
|
|
def _stream_response(self, url, payload):
|
|
"""Stream the response"""
|
|
response = requests.post(url, json=payload, stream=True, timeout=120)
|
|
|
|
if response.status_code == 200:
|
|
full_response = ""
|
|
for line in response.iter_lines():
|
|
if line:
|
|
try:
|
|
data = json.loads(line)
|
|
if 'response' in data:
|
|
chunk = data['response']
|
|
print(chunk, end='', flush=True)
|
|
full_response += chunk
|
|
if data.get('done', False):
|
|
print()
|
|
break
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return full_response
|
|
else:
|
|
print(f"❌ API Error: {response.status_code}")
|
|
return None
|
|
|
|
def chat(self, message, conversation_history=None):
|
|
"""Chat interface"""
|
|
if conversation_history is None:
|
|
conversation_history = []
|
|
|
|
conversation_history.append({"role": "user", "content": message})
|
|
|
|
formatted_prompt = self._format_conversation(conversation_history)
|
|
response = self.generate_response(formatted_prompt, stream=False)
|
|
|
|
if response:
|
|
conversation_history.append({"role": "assistant", "content": response})
|
|
return response, conversation_history
|
|
else:
|
|
return "Sorry, I couldn't generate a response.", conversation_history
|
|
|
|
def _format_conversation(self, conversation_history):
|
|
"""Format conversation history"""
|
|
formatted = ""
|
|
for turn in conversation_history:
|
|
if turn["role"] == "user":
|
|
formatted += f"User: {turn['content']}\n\n"
|
|
else:
|
|
formatted += f"Assistant: {turn['content']}\n\n"
|
|
formatted += "Assistant:"
|
|
return formatted
|
|
|
|
def main():
|
|
"""Main function with improved setup flow"""
|
|
client = OllamaDeepSeekClient()
|
|
|
|
print("🤖 Ollama DeepSeek V2 Lite Chat")
|
|
print("=" * 50)
|
|
|
|
# Get system info
|
|
info = client.get_system_info()
|
|
print("\nSystem Status:")
|
|
print(f" Port 11434: {'🔴 Used' if info['port_11434_used'] else '🟢 Free'}")
|
|
print(f" Container: {'🟢 Exists' if info['container_exists'] else '🔴 Not found'}")
|
|
print(f" Running: {'🟢 Yes' if info['container_running'] else '🔴 No'}")
|
|
print(f" Ollama API: {'🟢 Accessible' if info['ollama_accessible'] else '🔴 Not accessible'}")
|
|
|
|
# Decision logic
|
|
if info['ollama_accessible']:
|
|
print("\n✅ Ollama is ready!")
|
|
else:
|
|
print("\n🔧 Ollama needs setup...")
|
|
|
|
if info['container_exists']:
|
|
print("Container exists but not running.")
|
|
choice = input("Start existing container? (y/n): ").lower().strip()
|
|
if choice == 'y':
|
|
if client.start_ollama():
|
|
print("✅ Container started!")
|
|
else:
|
|
print("❌ Failed to start container")
|
|
choice = input("Set up new container? (y/n): ").lower().strip()
|
|
if choice == 'y':
|
|
client.setup_ollama()
|
|
else:
|
|
return
|
|
else:
|
|
choice = input("Set up new container? (y/n): ").lower().strip()
|
|
if choice == 'y':
|
|
client.setup_ollama()
|
|
else:
|
|
return
|
|
else:
|
|
print("No container found.")
|
|
choice = input("Set up new Ollama container? (y/n): ").lower().strip()
|
|
if choice == 'y':
|
|
client.setup_ollama()
|
|
else:
|
|
return
|
|
|
|
# Final check
|
|
if not client.check_ollama_running():
|
|
print("❌ Ollama is still not accessible. Exiting.")
|
|
return
|
|
|
|
# List models and start chat
|
|
client.list_models()
|
|
print(f"\n💬 Using model: {client.model}")
|
|
print("Commands: 'quit' to exit, 'clear' to clear history")
|
|
print("-" * 50)
|
|
|
|
conversation_history = []
|
|
|
|
while True:
|
|
try:
|
|
user_input = input("\nYou: ").strip()
|
|
|
|
if user_input.lower() in ['quit', 'exit']:
|
|
print("👋 Goodbye!")
|
|
break
|
|
elif user_input.lower() == 'clear':
|
|
conversation_history = []
|
|
print("🗑️ Conversation cleared")
|
|
continue
|
|
elif not user_input:
|
|
continue
|
|
|
|
print("Assistant: ", end='')
|
|
response, conversation_history = client.chat(user_input, conversation_history)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n👋 Goodbye!")
|
|
break
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
class DocumentContext:
|
|
def __init__(self, client):
|
|
self.client = client
|
|
self.loaded_documents = {}
|
|
|
|
def load_markdown_file(self, file_path):
|
|
"""Load a single markdown file"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
content = file.read()
|
|
self.loaded_documents[file_path] = content
|
|
print(f"✅ Loaded: {file_path} ({len(content)} characters)")
|
|
return content
|
|
except Exception as e:
|
|
print(f"❌ Error loading {file_path}: {e}")
|
|
return None
|
|
|
|
def load_markdown_directory(self, directory_path):
|
|
"""Load all .md files from a directory"""
|
|
md_files = glob.glob(os.path.join(directory_path, "*.md"))
|
|
for file_path in md_files:
|
|
self.load_markdown_file(file_path)
|
|
|
|
def get_document_context(self, max_length=8000):
|
|
"""Get all loaded documents as context"""
|
|
context = ""
|
|
for filename, content in self.loaded_documents.items():
|
|
context += f"--- {os.path.basename(filename)} ---\n{content}\n\n"
|
|
|
|
# Truncate if too long
|
|
if len(context) > max_length:
|
|
context = context[:max_length] + "\n\n[Content truncated due to length]"
|
|
|
|
return context
|
|
|
|
def query_with_context(self, question, document_names=None, temperature=0.7):
|
|
"""Query the model with document context"""
|
|
if not self.loaded_documents:
|
|
return "No documents loaded. Please load some .md files first."
|
|
|
|
# Get context from specific documents or all
|
|
if document_names:
|
|
context = ""
|
|
for doc_name in document_names:
|
|
for filename, content in self.loaded_documents.items():
|
|
if doc_name in filename:
|
|
context += f"--- {os.path.basename(filename)} ---\n{content}\n\n"
|
|
else:
|
|
context = self.get_document_context()
|
|
|
|
prompt = f"""Based on the following documents, please answer the question.
|
|
|
|
DOCUMENTS:
|
|
{context}
|
|
|
|
QUESTION: {question}
|
|
|
|
Please provide a comprehensive answer based on the documents above. If the information isn't in the documents, please mention that."""
|
|
|
|
return self.client.generate_response(prompt, temperature=temperature)
|
|
|
|
# Make sure to add the missing method to the OllamaDeepSeekClientWithDocs class
|
|
class OllamaDeepSeekClientWithDocs(OllamaDeepSeekClient):
|
|
def __init__(self, base_url="http://localhost:11434"):
|
|
super().__init__(base_url)
|
|
self.doc_context = DocumentContext(self)
|
|
|
|
def load_documents(self, path):
|
|
"""Load documents from file or directory"""
|
|
if os.path.isfile(path) and path.endswith('.md'):
|
|
self.doc_context.load_markdown_file(path)
|
|
elif os.path.isdir(path):
|
|
self.doc_context.load_markdown_directory(path)
|
|
else:
|
|
print("❌ Please provide a valid .md file or directory")
|
|
|
|
def list_loaded_documents(self):
|
|
"""List all loaded documents"""
|
|
if not self.doc_context.loaded_documents:
|
|
print("No documents loaded.")
|
|
return
|
|
|
|
print("\n📚 Loaded Documents:")
|
|
total_chars = 0
|
|
for filename, content in self.doc_context.loaded_documents.items():
|
|
char_count = len(content)
|
|
total_chars += char_count
|
|
print(f" - {os.path.basename(filename)} ({char_count} characters)")
|
|
|
|
print(f"Total: {len(self.doc_context.loaded_documents)} documents, {total_chars} characters")
|
|
|
|
def chat_with_context(self, message, use_documents=True):
|
|
"""Chat with document context"""
|
|
if use_documents and self.doc_context.loaded_documents:
|
|
context = self.doc_context.get_document_context()
|
|
enhanced_prompt = f"""Based on the following documents, please answer the user's question.
|
|
|
|
DOCUMENTS:
|
|
{context}
|
|
|
|
USER QUESTION: {message}
|
|
|
|
Please provide a comprehensive answer based on the documents above. If the information isn't in the documents, please mention that and provide a general answer if possible."""
|
|
|
|
return self.generate_response(enhanced_prompt)
|
|
else:
|
|
return self.generate_response(message)
|
|
|
|
|
|
def main():
|
|
"""Main function with document support"""
|
|
client = OllamaDeepSeekClientWithDocs()
|
|
|
|
print("🤖 Ollama DeepSeek with Document Context")
|
|
print("=" * 50)
|
|
|
|
# Setup Ollama
|
|
info = client.get_system_info()
|
|
print("\nSystem Status:")
|
|
print(f" Port 11434: {'🔴 Used' if info['port_11434_used'] else '🟢 Free'}")
|
|
print(f" Container: {'🟢 Exists' if info['container_exists'] else '🔴 Not found'}")
|
|
print(f" Running: {'🟢 Yes' if info['container_running'] else '🔴 No'}")
|
|
print(f" Ollama API: {'🟢 Accessible' if info['ollama_accessible'] else '🔴 Not accessible'}")
|
|
|
|
if not info['ollama_accessible']:
|
|
print("\n🔧 Ollama needs setup...")
|
|
if info['container_exists']:
|
|
choice = input("Start existing container? (y/n): ").lower().strip()
|
|
if choice == 'y':
|
|
if not client.start_ollama():
|
|
print("❌ Failed to start container")
|
|
return
|
|
else:
|
|
choice = input("Set up new container? (y/n): ").lower().strip()
|
|
if choice == 'y':
|
|
if not client.setup_ollama():
|
|
return
|
|
else:
|
|
return
|
|
else:
|
|
choice = input("Set up new Ollama container? (y/n): ").lower().strip()
|
|
if choice == 'y':
|
|
if not client.setup_ollama():
|
|
return
|
|
else:
|
|
return
|
|
|
|
# Final check
|
|
if not client.check_ollama_running():
|
|
print("❌ Ollama is not accessible. Exiting.")
|
|
return
|
|
|
|
# List available models
|
|
client.list_models()
|
|
print(f"\n💬 Using model: {client.model}")
|
|
|
|
# Document loading interface
|
|
print("\n📁 Document Loading Options")
|
|
print("-" * 40)
|
|
|
|
while True:
|
|
print("\nOptions:")
|
|
print("1. Load a markdown file")
|
|
print("2. Load all markdown files from directory")
|
|
print("3. List loaded documents")
|
|
print("4. Clear all loaded documents")
|
|
print("5. Start chat with documents")
|
|
print("6. Start chat without documents")
|
|
print("7. Query specific documents")
|
|
print("8. Exit")
|
|
|
|
choice = input("\nChoose option (1-8): ").strip()
|
|
|
|
if choice == '1':
|
|
file_path = input("Enter markdown file path: ").strip()
|
|
if os.path.exists(file_path) and file_path.endswith('.md'):
|
|
client.load_documents(file_path)
|
|
else:
|
|
print("❌ File not found or not a .md file")
|
|
|
|
elif choice == '2':
|
|
dir_path = input("Enter directory path: ").strip()
|
|
if os.path.exists(dir_path) and os.path.isdir(dir_path):
|
|
client.load_documents(dir_path)
|
|
else:
|
|
print("❌ Directory not found")
|
|
|
|
elif choice == '3':
|
|
client.list_loaded_documents()
|
|
|
|
elif choice == '4':
|
|
client.doc_context.loaded_documents.clear()
|
|
print("🗑️ All documents cleared")
|
|
|
|
elif choice == '5':
|
|
if client.doc_context.loaded_documents:
|
|
start_chat_session(client, use_documents=True)
|
|
else:
|
|
print("❌ No documents loaded. Please load documents first.")
|
|
|
|
elif choice == '6':
|
|
start_chat_session(client, use_documents=False)
|
|
|
|
elif choice == '7':
|
|
if client.doc_context.loaded_documents:
|
|
query_specific_documents(client)
|
|
else:
|
|
print("❌ No documents loaded. Please load documents first.")
|
|
|
|
elif choice == '8':
|
|
print("👋 Goodbye!")
|
|
break
|
|
|
|
else:
|
|
print("❌ Invalid choice. Please enter 1-8.")
|
|
|
|
def start_chat_session(client, use_documents=True):
|
|
"""Start a chat session with or without documents"""
|
|
conversation_history = []
|
|
|
|
if use_documents:
|
|
print(f"\n💡 Chatting with {len(client.doc_context.loaded_documents)} loaded documents")
|
|
print("The model will reference your documents automatically.")
|
|
else:
|
|
print("\n💬 Regular chat mode (no document context)")
|
|
|
|
print("Commands: 'back' to return, 'clear' to clear history, 'documents' to toggle document usage")
|
|
print("-" * 60)
|
|
|
|
while True:
|
|
try:
|
|
user_input = input("\nYou: ").strip()
|
|
|
|
if user_input.lower() == 'back':
|
|
break
|
|
elif user_input.lower() == 'clear':
|
|
conversation_history = []
|
|
print("🗑️ Conversation history cleared")
|
|
continue
|
|
elif user_input.lower() == 'documents':
|
|
use_documents = not use_documents
|
|
mode = "with documents" if use_documents else "without documents"
|
|
print(f"🔄 Switched to chat {mode}")
|
|
continue
|
|
elif not user_input:
|
|
continue
|
|
|
|
print("Assistant: ", end='', flush=True)
|
|
|
|
if use_documents:
|
|
# Use document context for each query
|
|
response = client.chat_with_context(user_input, use_documents=True)
|
|
if response:
|
|
print(response)
|
|
else:
|
|
print("❌ No response generated")
|
|
else:
|
|
# Regular chat without documents
|
|
response, conversation_history = client.chat(user_input, conversation_history)
|
|
if response:
|
|
print(response)
|
|
else:
|
|
print("❌ No response generated")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nReturning to main menu...")
|
|
break
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {e}")
|
|
|
|
def query_specific_documents(client):
|
|
"""Query specific documents by name"""
|
|
print("\n📚 Loaded Documents:")
|
|
doc_names = []
|
|
for i, filename in enumerate(client.doc_context.loaded_documents.keys()):
|
|
doc_name = os.path.basename(filename)
|
|
doc_names.append(doc_name)
|
|
print(f" {i+1}. {doc_name}")
|
|
|
|
print("\nSelect documents to query (comma-separated numbers, or 'all'):")
|
|
selection = input("Selection: ").strip()
|
|
|
|
selected_docs = []
|
|
if selection.lower() == 'all':
|
|
selected_docs = doc_names
|
|
print("✅ Using all documents")
|
|
else:
|
|
try:
|
|
indices = [int(x.strip()) - 1 for x in selection.split(',')]
|
|
for idx in indices:
|
|
if 0 <= idx < len(doc_names):
|
|
selected_docs.append(doc_names[idx])
|
|
|
|
if selected_docs:
|
|
print(f"✅ Using documents: {', '.join(selected_docs)}")
|
|
else:
|
|
print("❌ No valid documents selected")
|
|
return
|
|
except ValueError:
|
|
print("❌ Invalid selection")
|
|
return
|
|
|
|
print("\nEnter your question about the selected documents:")
|
|
question = input("Question: ").strip()
|
|
|
|
if question:
|
|
print("\nAssistant: ", end='', flush=True)
|
|
response = client.doc_context.query_with_context(question, document_names=selected_docs)
|
|
if response:
|
|
print(response)
|
|
else:
|
|
print("❌ No response generated")
|
|
else:
|
|
print("❌ No question provided")
|
|
|
|
|
|
# def main():
|
|
# """Main interactive chat function"""
|
|
# client = OllamaDeepSeekClient()
|
|
|
|
# print("🤖 Ollama DeepSeek V2 Lite Chat")
|
|
# print("=" * 50)
|
|
|
|
# # Check container status
|
|
# status = client.get_container_status()
|
|
# if status:
|
|
# print(f"Container Status: {status['status']}")
|
|
# print(f"Ports: {status['ports']}")
|
|
# else:
|
|
# print("No container found.")
|
|
|
|
# # Check if Ollama is running
|
|
# if not client.check_ollama_running():
|
|
# print("\nOllama is not running.")
|
|
# choice = input("Do you want to start the existing container? (y/n): ").lower().strip()
|
|
|
|
# if choice == 'y':
|
|
# if not client.start_ollama_container():
|
|
# print("Failed to start existing container.")
|
|
# choice = input("Do you want to set up a new container? (y/n): ").lower().strip()
|
|
# if choice == 'y':
|
|
# if not client.setup_ollama_container():
|
|
# print("Failed to set up Ollama. Exiting.")
|
|
# return
|
|
# else:
|
|
# print("Exiting.")
|
|
# return
|
|
# else:
|
|
# choice = input("Do you want to set up a new container? (y/n): ").lower().strip()
|
|
# if choice == 'y':
|
|
# if not client.setup_ollama_container():
|
|
# print("Failed to set up Ollama. Exiting.")
|
|
# return
|
|
# else:
|
|
# print("Exiting.")
|
|
# return
|
|
|
|
# # List available models
|
|
# print(f"\nUsing model: {client.model}")
|
|
# print("Base URL:", client.base_url)
|
|
# client.list_models()
|
|
|
|
# print("\nCommands: 'quit' to exit, 'clear' to clear history, 'status' for container info\n")
|
|
|
|
# conversation_history = []
|
|
|
|
# while True:
|
|
# try:
|
|
# user_input = input("You: ").strip()
|
|
|
|
# if user_input.lower() in ['quit', 'exit']:
|
|
# print("Goodbye! 👋")
|
|
# break
|
|
# elif user_input.lower() == 'clear':
|
|
# conversation_history = []
|
|
# print("Conversation history cleared.")
|
|
# continue
|
|
# elif user_input.lower() == 'status':
|
|
# status = client.get_container_status()
|
|
# if status:
|
|
# print(f"Container: {status['name']}")
|
|
# print(f"Status: {status['status']}")
|
|
# print(f"Ports: {status['ports']}")
|
|
# else:
|
|
# print("No container found.")
|
|
# continue
|
|
# elif not user_input:
|
|
# continue
|
|
|
|
# print("Assistant: ", end='')
|
|
# response, conversation_history = client.chat(user_input, conversation_history)
|
|
|
|
# except KeyboardInterrupt:
|
|
# print("\n\nGoodbye! 👋")
|
|
# break
|
|
# except Exception as e:
|
|
# print(f"\nError: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|