https://github.com/virtualagc/virtualagc
Raw File
Tip revision: a8d66c8ae3c4d967053e2bfb4298e90182cbeff2 authored by Ron Burkey on 06 March 2024, 12:40:55 UTC
Minor tweaks associated with supporting Skylark 48.
Tip revision: a8d66c8
piDSKY.py
#!/usr/bin/python3
# Copyright:	None, placed in the PUBLIC DOMAIN by its author (Ron Burkey)
# Filename: 	piDSKY.py
# Purpose:	This is an illustration of how to use the skeleton peripheral program 
#		piPeripheral.py to create a simple simulated DSKY.
# Reference:	http://www.ibiblio.org/apollo/developer.html
# Mod history:	2017-11-17 RSB	Began.
#               2017-11-21 RSB	Updated with some fixes to the PRO and NOUN
#				keys that had been identified for piDSKY2.py.
#		2017-12-02 RSB	Replaced the entire program with a stripped form
#				of piDSKY2.py (in which all hardware-specific stuff
#				has been removed), because it was easier than 
#				back-porting bug-fixes.
#		2018-01-06 MAS	Switched the TEMP light to use channel 163 instead
#				of channel 11.
#
# Note that certain functionality (I think the code for get_char_keyboard_nonblock)
# might not work under Windows, but everything should
# presumably work in Linux, Raspbian, Mac OS X, etc.
#
# In this skeleton form, the script acts as kind of a console-based DSKY, in which
# you can use keyboard keys (0 1 2 3 4 5 6 7 8 9 + - V N C P K R Enter) as surrogates
# for DSKY pushbuttons, and all DSKY-related outputs from yaAGC are simply parsed and
# displayed in textual form, though the idea is that in general, you'd rip out all of 
# that DSKY-specific stuff and replace it with whatever you wanted.
#
# The parts which need to be modified to be target-system specific are the 
# outputFromAGC() and inputsForAGC() functions, which are in the section *after* the following
# section.  The immediately following section, on the other hand, has some utility functions I use
# for the default outputFromAGC() and inputsForAGC() functions I provide, and
# can be deleted if they're not useful for the specific implementation desired.
#
# To run the program in its present form, you have to use yaAGC, and optionally
# yaDSKY2 (if you want to see the graphical DSKY and piPeripheral.py working in
# parallel).  To do that, assuming you had a directory setup in which all of the
# appropriate files could be found, you could run (presumably from different consoles)
#
#	yaDSKY2 --cfg=LM.ini --port=19797
#	yaAGC --core=Luminary099.bin --port=19797 --cfg=LM.ini
#	piDSKY.py
#
# If you didn't want to use yaDSKY2, then this stuff could all be run in a pure
# command-line environment without a GUI desktop.

import time
import os
import signal
import sys
import argparse
import threading
import termios
import fcntl
import socket

# Parse command-line arguments.
cli = argparse.ArgumentParser()
cli.add_argument("--host", help="Host address of yaAGC, defaulting to localhost.")
cli.add_argument("--port", help="Port for yaAGC, defaulting to 19798.", type=int)
cli.add_argument("--slow", help="For use on really slow host systems.")
args = cli.parse_args()

# Responsiveness settings.
if args.slow:
	PULSE = 0.25
	lampDeadtime = 0.25
else:
	PULSE = 0.05
	lampDeadtime = 0.1

# Characteristics of the host and port being used for yaAGC communications.  
if args.host:
	TCP_IP = args.host
else:
	TCP_IP = 'localhost'
if args.port:
	TCP_PORT = args.port
else:
	TCP_PORT = 19798

###################################################################################
# Some utilities I happen to use in my sample hardware abstraction functions, but
# not of value outside of that, unless you happen to be implementing DSKY functionality
# in a similar way.

# Given a 3-tuple (channel,value,mask), creates packet data and sends it to yaAGC.
def packetize(tuple):
	outputBuffer = bytearray(4)
	# First, create and output the mask command.
	outputBuffer[0] = 0x20 | ((tuple[0] >> 3) & 0x0F)
	outputBuffer[1] = 0x40 | ((tuple[0] << 3) & 0x38) | ((tuple[2] >> 12) & 0x07)
	outputBuffer[2] = 0x80 | ((tuple[2] >> 6) & 0x3F)
	outputBuffer[3] = 0xC0 | (tuple[2] & 0x3F)
	s.send(outputBuffer)
	# Now, the actual data for the channel.
	outputBuffer[0] = 0x00 | ((tuple[0] >> 3) & 0x0F)
	outputBuffer[1] = 0x40 | ((tuple[0] << 3) & 0x38) | ((tuple[1] >> 12) & 0x07)
	outputBuffer[2] = 0x80 | ((tuple[1] >> 6) & 0x3F)
	outputBuffer[3] = 0xC0 | (tuple[1] & 0x3F)
	s.send(outputBuffer)

# This particular function parses various keystrokes, like '0' or 'V' and creates
# packets as if they were DSKY keypresses.  It should be called occasionally as
# parseDskyKey(0) if there are no keystrokes, in order to make sure that the PRO
# key gets released.  

# The return value of this function is
# a list ([...]), of which each element is a 3-tuple consisting of an AGC channel
# number, a value for that channel, and a bitmask that tells which bit-positions
# of the value are valid.  The returned list can be empty.  For example, a
# return value of 
#	[ ( 0o15, 0o31, 0o37 ) ]
# would indicate that the lowest 5 bits of channel 15 (octal) were valid, and that
# the value of those bits were 11001 (binary), which collectively indicate that
# the KEY REL key on a DSKY is pressed.
resetCount = 0
def parseDskyKey(ch):
	global resetCount
	if ch == 'R':
		resetCount += 1
		if resetCount >= 5:
			print("Exiting ...")
			return ""
	elif ch != "":
		resetCount = 0
	returnValue = []
	if ch == '0':
		returnValue.append( (0o15, 0o20, 0o37) )
	elif ch == '1':
		returnValue.append( (0o15, 0o1, 0o37) )
	elif ch == '2':
    		returnValue.append( (0o15, 0o2, 0o37) )
	elif ch == '3':
    		returnValue.append( (0o15, 0o3, 0o37) )
	elif ch == '4':
    		returnValue.append( (0o15, 0o4, 0o37) )
	elif ch == '5':
    		returnValue.append( (0o15, 0o5, 0o37) )
	elif ch == '6':
    		returnValue.append( (0o15, 0o6, 0o37) )
	elif ch == '7':
    		returnValue.append( (0o15, 0o7, 0o37) )
	elif ch == '8':
    		returnValue.append( (0o15, 0o10, 0o37) )
	elif ch == '9':
    		returnValue.append( (0o15, 0o11, 0o37) )
	elif ch == '+':
    		returnValue.append( (0o15, 0o32, 0o37) )
	elif ch == '-':
    		returnValue.append( (0o15, 0o33, 0o37) )
	elif ch == 'V':
    		returnValue.append( (0o15, 0o21, 0o37) )
	elif ch == 'N':
    		returnValue.append( (0o15, 0o37, 0o37) )
	elif ch == 'R':
    		returnValue.append( (0o15, 0o22, 0o37) )
	elif ch == 'C':
    		returnValue.append( (0o15, 0o36, 0o37) )
	elif ch == 'P':
    		returnValue.append( (0o32, 0o00000, 0o20000) )
	elif ch == 'p' or ch == 'PR':
    		returnValue.append( (0o32, 0o20000, 0o20000) )
	elif ch == 'K':
    		returnValue.append( (0o15, 0o31, 0o37) )
	elif ch == '\n':
		returnValue.append( (0o15, 0o34, 0o37) )
	return returnValue	

# This function turns keyboard echo on or off.
def echoOn(control):
	fd = sys.stdin.fileno()
	new = termios.tcgetattr(fd)
	if control:
		print("Keyboard echo on")
		new[3] |= termios.ECHO
	else:
		print("Keyboard echo off")
		new[3] &= ~termios.ECHO
	termios.tcsetattr(fd, termios.TCSANOW, new)
echoOn(False)

# This function is a non-blocking read of a single character from the
# keyboard.  Returns either the key value (such as '0' or 'V'), or else
# the value "" if no key was pressed.  Note:  fakes a "key" 
# 'PR' 0.75 seconds after a key 'p' or 'P'.  This is in lieu of PRO
# press and release events.  Is is possible to get keypress and release
# events or other equivalent data from the Python "keyboard" module, but
# I didn't know about it at first, and am too lazy to go back and add
# that support.
pressedPRO = False
timePRO = 0
def get_char_keyboard_nonblock():
	global pressedPRO, timePRO
	fd = sys.stdin.fileno()
	oldterm = termios.tcgetattr(fd)
	newattr = termios.tcgetattr(fd)
	newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO
	termios.tcsetattr(fd, termios.TCSANOW, newattr)
	oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
	fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
	c = ""
	try:
    		c = sys.stdin.read(1)
	except IOError: pass
	termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
	fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
	if c == 'p' or c == 'P':
		pressedPRO = True
		timePRO = time.time()
	if c == "" and pressedPRO and time.time() > timePRO + 0.75:
		pressedPRO = False
		c = 'PR'
	return c

# The following dictionary gives, for each indicator lamp:
#	Whether or not it is currently lit.
# This information isn't actually used for anything, but can be useful in a 
# specific hardware model as a way to know which lamp statuses have changed.
lampStatuses = {
	"UPLINK ACTY" : { "isLit" : False },
	"TEMP" : { "isLit" : False },
	"NO ATT" : { "isLit" : False },
	"GIMBAL LOCK" : { "isLit" : False },
	"DSKY STANDBY" : { "isLit" : False },
	"PROG" : { "isLit" : False },
	"KEY REL" : { "isLit" : False },
	"RESTART" : { "isLit" : False },
	"OPR ERR" : { "isLit" : False },
	"TRACKER" : { "isLit" : False },
	"PRIO DSP" : { "isLit" : False },
	"ALT" : { "isLit" : False },
	"NO DAP" : { "isLit" : False },
	"VEL" : { "isLit" : False }
}
# For modifying the lampStatuses[] array.
def updateLampStatuses(key, value):
	global lampStatuses
	if key in lampStatuses:
		lampStatuses[key]["isLit"] = value

# Converts a 5-bit code in channel 010 to " ", "0", ..., "9".
def codeToString(code):
	if code == 0:
		return " "
	elif code == 21:
		return "0"
	elif code == 3:
		return "1"
	elif code == 25:
		return "2"
	elif code == 27:
		return "3"
	elif code == 15:
		return "4"
	elif code == 30:
		return "5"
	elif code == 28:
		return "6"
	elif code == 19:
		return "7"
	elif code == 29:
		return "8"
	elif code == 31:
		return "9"
	return "?"

###################################################################################
# Hardware abstraction / User-defined functions.  Also, any other platform-specific
# initialization.

# This function is automatically called periodically by the event loop to check for 
# conditions that will result in sending messages to yaAGC that are interpreted
# as changes to bits on its input channels.  For test purposes, it simply polls the
# keyboard, and interprets various keystrokes as DSKY keys if present.  The return
# value is supposed to be a list of 3-tuples of the form
#	[ (channel0,value0,mask0), (channel1,value1,mask1), ...]
# and may be en empty list.  
def inputsForAGC():
	ch = get_char_keyboard_nonblock()
	ch = ch.upper()
	if ch == '_':
		ch = '-'
	elif ch == '=':
		ch = '+'
	else:
		returnValue = parseDskyKey(ch)
	if len(returnValue) > 0:
        	print("Sending to yaAGC: " + oct(returnValue[0][1]) + "(mask " + oct(returnValue[0][2]) + ") -> channel " + oct(returnValue[0][0]))
	return returnValue

def updateLamps():
	# If there were actual hardware, this is where you could use
	# lampStatus[] to control the lamps.
	return
updateLamps()

# This function is called by the event loop only when yaAGC has written
# to an output channel.  The function should do whatever it is that needs to be done
# with this output data, which is not processed additionally in any way by the 
# generic portion of the program. As a test, I simply display the outputs for 
# those channels relevant to the DSKY.
last10 = 1234567
last11 = 1234567
last13 = 1234567
last163 = 1234567
plusMinusState1 = 0
plusMinusState2 = 0
plusMinusState3 = 0
def outputFromAGC(channel, value):
	# These lastNN values are just used to cut down on the number of messages printed,
	# when the same value is output over and over again to the same channel, because
	# that makes debugging harder.  
	global last10, last11, last13, last163, plusMinusState1, plusMinusState2, plusMinusState3
	if (channel == 0o13):
		value &= 0o3000
	if (channel == 0o10 and value != last10) or (channel == 0o11 and value != last11) or (channel == 0o13 and value != last13) or (channel == 0o163 and value != last163):
		if channel == 0o10:
			last10 = value
			aaaa = (value >> 11) & 0x0F
			b = (value >> 10) & 0x01
			ccccc = (value >> 5) & 0x1F
			ddddd = value & 0x1F
			if aaaa != 12:
				sc = codeToString(ccccc)
				sd = codeToString(ddddd)
			if aaaa == 11:
				print(sc + " -> M1   " + sd + " -> M2")
			elif aaaa == 10:
				print(sc + " -> V1   " + sd + " -> V2")
			elif aaaa == 9:
				print(sc + " -> N1   " + sd + " -> N2")
			elif aaaa == 8:
				print("          " + sd + " -> 11")
			elif aaaa == 7:
				plusMinus = "  "
				if b != 0:
					plusMinus = "1+"
					plusMinusState1 |= 1
				else:
					plusMinusState1 &= ~1
				print(sc + " -> 12   " + sd + " -> 13   " + plusMinus)
			elif aaaa == 6:
				plusMinus = "  "
				if b != 0:
					plusMinus = "1-"
					plusMinusState1 |= 2
				else:
					plusMinusState1 &= ~2
				print(sc + " -> 14   " + sd + " -> 15   " + plusMinus)
			elif aaaa == 5:
				plusMinus = "  "
				if b != 0:
					plusMinus = "2+"
					plusMinusState2 |= 1
				else:
					plusMinusState2 &= ~1
				print(sc + " -> 21   " + sd + " -> 22   " + plusMinus)
			elif aaaa == 4:
				plusMinus = "  "
				if b != 0:
					plusMinus = "2-"
					plusMinusState2 |= 2
				else:
					plusMinusState2 &= ~2
				print(sc + " -> 23   " + sd + " -> 24   " + plusMinus)
			elif aaaa == 3:
				print(sc + " -> 25   " + sd + " -> 31")
			elif aaaa == 2:
				plusMinus = "  "
				if b != 0:
					plusMinus = "3+"
					plusMinusState3 |= 1
				else:
					plusMinusState3 &= ~1
				print(sc + " -> 32   " + sd + " -> 33   " + plusMinus)
			elif aaaa == 1:
				plusMinus = "  "
				if b != 0:
					plusMinus = "3-"
					plusMinusState3 |= 2
				else:
					plusMinusState3 &= ~2
				print(sc + " -> 34   " + sd + " -> 35   " + plusMinus)
			elif aaaa == 12:
				vel = "VEL OFF         "
				if (value & 0x04) != 0:
					vel = "VEL ON          "
					updateLampStatuses("VEL", True)
				else:
					updateLampStatuses("VEL", False)
				noAtt = "NO ATT OFF      "
				if (value & 0x08) != 0:
					noAtt = "NO ATT ON       "
					updateLampStatuses("NO ATT", True)
				else:
					updateLampStatuses("NO ATT", False)
				alt = "ALT OFF         "
				if (value & 0x10) != 0:
					alt = "ALT ON          "
					updateLampStatuses("ALT", True)
				else:
					updateLampStatuses("ALT", False)
				gimbalLock = "GIMBAL LOCK OFF "
				if (value & 0x20) != 0:
					gimbalLock = "GIMBAL LOCK ON  "
					updateLampStatuses("GIMBAL LOCK", True)
				else:
					updateLampStatuses("GIMBAL LOCK", False)
				tracker = "TRACKER OFF     "
				if (value & 0x80) != 0:
					tracker = "TRACKER ON      "
					updateLampStatuses("TRACKER", True)
				else:
					updateLampStatuses("TRACKER", False)
				prog = "PROG OFF        "
				if (value & 0x100) != 0:
					prog = "PROG ON         "
					updateLampStatuses("PROG", True)
				else:
					updateLampStatuses("PROG", False)
				print(vel + "   " + noAtt + "   " + alt + "   " + gimbalLock + "   " + tracker + "   " + prog)
				updateLamps()
		elif channel == 0o11:
			last11 = value
			compActy = "COMP ACTY OFF   "
			if (value & 0x02) != 0:
				compActy = "COMP ACTY ON    "
			uplinkActy = "UPLINK ACTY OFF "
			if (value & 0x04) != 0:
				uplinkActy = "UPLINK ACTY ON  "
				updateLampStatuses("UPLINK ACTY", True)
			else:
				updateLampStatuses("UPLINK ACTY", False)
			flashing = "V/N NO FLASH    "
			print(compActy + "   " + uplinkActy + "   " + "   " + flashing)
			updateLamps()
		elif channel == 0o13:
			last13 = value
			test = "DSKY TEST       "
			if (value & 0x200) == 0:
				test = "DSKY NO TEST    "
			print(test)
			updateLamps()
		elif channel == 0o163:
			last163 = value
			if (value & 0x08) != 0:
				temp = "TEMP ON         "
				updateLampStatuses("TEMP", True)
			else:
				temp = "TEMP OFF        "
				updateLampStatuses("TEMP", False)
			if (value & 0o400) != 0:
				standby = "DSKY STANDBY ON "
				updateLampStatuses("DSKY STANDBY", True)
			else:
				standby = "DSKY STANDBY OFF"
				updateLampStatuses("DSKY STANDBY", False)
			if (value & 0o20) != 0:
				keyRel = "KEY REL ON      "
				updateLampStatuses("KEY REL", True)
			else:
				keyRel = "KEY REL OFF     "
				updateLampStatuses("KEY REL", False)
			if (value & 0o100) != 0:
				oprErr = "OPR ERR FLASH   "
				updateLampStatuses("OPR ERR", True)
			else:
				oprErr = "OPR ERR OFF     "
				updateLampStatuses("OPR ERR", False)
			if (value & 0o200) != 0:
				restart = "RESTART ON    "
				updateLampStatuses("RESTART", True)
			else:
				restart = "RESTART OFF     "
				updateLampStatuses("RESTART", False)
			print(temp + "   " + standby + "   " + keyRel + "   " + oprErr + "   " + restart)
		else:
			print("Received from yaAGC: " + oct(value) + " -> channel " + oct(channel))
	return

###################################################################################
# Generic initialization (TCP socket setup).  Has no target-specific code, and 
# shouldn't need to be modified unless there are bugs.

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(0)

def connectToAGC():
	while True:
		try:
			s.connect((TCP_IP, TCP_PORT))
			print("Connected to yaAGC (" + TCP_IP + ":" + str(TCP_PORT) + ")")
			break
		except socket.error as msg:
			print("Could not connect to yaAGC (" + TCP_IP + ":" + str(TCP_PORT) + "), exiting: " + str(msg))
			time.sleep(1)
			# The following provides a clean exit from the program by simply 
			# hitting any key.  However if get_char_keyboard_nonblock isn't
			# defined, just delete the next 4 lines and use Ctrl-C to exit instead.
			ch = get_char_keyboard_nonblock()
			if ch != "":
				print("Exiting ...")
				sys.exit()

connectToAGC()

###################################################################################
# Event loop.  Just check periodically for output from yaAGC (in which case the
# user-defined callback function outputFromAGC is executed) or data in the 
# user-defined function inputsForAGC (in which case a message is sent to yaAGC).
# But this section has no target-specific code, and shouldn't need to be modified
# unless there are bugs.

def eventLoop():
	# Buffer for a packet received from yaAGC.
	packetSize = 4
	inputBuffer = bytearray(packetSize)
	leftToRead = packetSize
	view = memoryview(inputBuffer)
	
	didSomething = False
	while True:
		if not didSomething:
			time.sleep(PULSE)
		didSomething = False
		
		# Check for packet data received from yaAGC and process it.
		# While these packets are always exactly 4
		# bytes long, since the socket is non-blocking, any individual read
		# operation may yield less bytes than that, so the buffer may accumulate data
		# over time until it fills.	
		try:
			numNewBytes = s.recv_into(view, leftToRead)
		except:
			numNewBytes = 0
		if numNewBytes > 0:
			view = view[numNewBytes:]
			leftToRead -= numNewBytes
			if leftToRead == 0:
				# Prepare for next read attempt.
				view = memoryview(inputBuffer)
				leftToRead = packetSize
				# Parse the packet just read, and call outputFromAGC().
				# Start with a sanity check.
				ok = 1
				if (inputBuffer[0] & 0xF0) != 0x00:
					ok = 0
				elif (inputBuffer[1] & 0xC0) != 0x40:
					ok = 0
				elif (inputBuffer[2] & 0xC0) != 0x80:
					ok = 0
				elif (inputBuffer[3] & 0xC0) != 0xC0:
					ok = 0
				# Packet has the various signatures we expect.
				if ok == 0:
					# Note that, depending on the yaAGC version, it occasionally
					# sends either a 1-byte packet (just 0xFF, older versions)
					# or a 4-byte packet (0xFF 0xFF 0xFF 0xFF, newer versions)
					# just for pinging the client.  These packets hold no
					# data and need to be ignored, but for other corrupted packets
					# we print a message. And try to realign past the corrupted
					# bytes.
					if inputBuffer[0] != 0xff or inputBuffer[1] != 0xff or inputBuffer[2] != 0xff or inputBuffer[2] != 0xff:
						if inputBuffer[0] != 0xff:
							print("Illegal packet: " + hex(inputBuffer[0]) + " " + hex(inputBuffer[1]) + " " + hex(inputBuffer[2]) + " " + hex(inputBuffer[3]))
						for i in range(1,packetSize):
							if (inputBuffer[i] & 0xF0) == 0:
								j = 0
								for k in range(i,4):
									inputBuffer[j] = inputBuffer[k]
									j += 1
								view = view[j:]
								leftToRead = packetSize - j
				else:
					channel = (inputBuffer[0] & 0x0F) << 3
					channel |= (inputBuffer[1] & 0x38) >> 3
					value = (inputBuffer[1] & 0x07) << 12
					value |= (inputBuffer[2] & 0x3F) << 6
					value |= (inputBuffer[3] & 0x3F)
					outputFromAGC(channel, value)
				didSomething = True
		
		# Check for locally-generated data for which we must generate messages
		# to yaAGC over the socket.  In theory, the externalData list could contain
		# any number of channel operations, but in practice (at least for something
		# like a DSKY implementation) it will actually contain only 0 or 1 operations.
		externalData = inputsForAGC()
		if externalData == "":
			echoOn(True)
			return
		for i in range(0, len(externalData)):
			packetize(externalData[i])
			didSomething = True

eventLoop()

os._exit(0)



back to top