RoboComp Logo

A simple robotics framework.

State Machine Code Generation in Python

19 Jul 2016

Equality between State Machine Code Generation in C++ and in Python

  • The language used for definition the state machine is the same.
  • The file of parsing of file that has the definition of state machine is the same.
  • The framework for the code of state machine is the same.

Codification of the state machine in Python

Robocompcdsl used a templates files to generate code written in python. I modified the following:

  • genericworker.py
  • specificworker.py

  • genericworker.py

This file contains the definition of state machine, slot functions, definition of the signals for transitions between states and the connections between signals and funtions.

  • specificworker.py

This file containd the code that is executed when the state machine active a state.

Example

  • genericworker.py

Example genericworker.py:

#
# Copyright (C) 2016 by YOUR NAME HERE
#
#    This file is part of RoboComp
#
#    RoboComp is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    RoboComp is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with RoboComp.  If not, see <http://www.gnu.org/licenses/>.

import sys
from PySide import *

class GenericWorker(QtCore.QObject):

	kill = QtCore.Signal()
#Signals for State Machine
	test1totest1 = QtCore.Signal()
	test1totest2 = QtCore.Signal()
	test2totest3 = QtCore.Signal()
	test2totest5 = QtCore.Signal()
	test2totest6 = QtCore.Signal()
	test3totest3 = QtCore.Signal()
	test3totest4 = QtCore.Signal()
	test4totest5 = QtCore.Signal()
	test5totest6 = QtCore.Signal()
	test1sub1totest1sub1 = QtCore.Signal()
	test1sub2totest1sub2 = QtCore.Signal()
	test1sub21totest1sub21 = QtCore.Signal()
	test1sub21totest1sub22 = QtCore.Signal()
	test3sub1totest3sub1 = QtCore.Signal()
	test3sub2totest3sub2 = QtCore.Signal()
	test5sub1totest1sub2 = QtCore.Signal()
	test1sub2totest5sub1 = QtCore.Signal()

#-------------------------

	def __init__(self, mprx):
		super(GenericWorker, self).__init__()

		self.jointmotor_proxy = mprx["JointMotorProxy"]

#State Machine
		self.Machine_testcpp= QtCore.QStateMachine()
		self.test2 = QtCore.QState(self.Machine_testcpp)
		self.test4 = QtCore.QState(self.Machine_testcpp)
		self.test5 = QtCore.QState(self.Machine_testcpp)

		self.test6 = QtCore.QFinalState(self.Machine_testcpp)

		self.test3 = QtCore.QState(QtCore.QState.ParallelStates, self.Machine_testcpp)
		self.test1 = QtCore.QState(QtCore.QState.ParallelStates,self.Machine_testcpp)

		self.test1sub1 = QtCore.QState(self.test1)
		self.test1sub2 = QtCore.QState(self.test1)

		self.test1sub21 = QtCore.QState(self.test1sub2)

		self.test1sub22 = QtCore.QFinalState(self.test1sub2)

		self.test3sub1 = QtCore.QState(self.test3)
		self.test3sub2 = QtCore.QState(self.test3)
		self.test3sub3 = QtCore.QState(self.test3)

		self.test5sub1 = QtCore.QState(self.test5)
		self.test5sub2 = QtCore.QState(self.test5)

#------------------
#Initialization State machine
		self.test1.addTransition(self.test1totest1, self.test1)
		self.test1.addTransition(self.test1totest2, self.test2)
		self.test2.addTransition(self.test2totest3, self.test3)
		self.test2.addTransition(self.test2totest5, self.test5)
		self.test2.addTransition(self.test2totest6, self.test6)
		self.test3.addTransition(self.test3totest3, self.test3)
		self.test3.addTransition(self.test3totest4, self.test4)
		self.test4.addTransition(self.test4totest5, self.test5)
		self.test5.addTransition(self.test5totest6, self.test6)
		self.test1sub1.addTransition(self.test1sub1totest1sub1, self.test1sub1)
		self.test1sub2.addTransition(self.test1sub2totest1sub2, self.test1sub2)
		self.test1sub21.addTransition(self.test1sub21totest1sub21, self.test1sub21)
		self.test1sub21.addTransition(self.test1sub21totest1sub22, self.test1sub22)
		self.test3sub1.addTransition(self.test3sub1totest3sub1, self.test3sub1)
		self.test3sub2.addTransition(self.test3sub2totest3sub2, self.test3sub2)
		self.test5sub1.addTransition(self.test5sub1totest1sub2, self.test1sub2)
		self.test1sub2.addTransition(self.test1sub2totest5sub1, self.test5sub1)

		self.test2.entered.connect(self.fun_test2)
		self.test3.entered.connect(self.fun_test3)
		self.test4.entered.connect(self.fun_test4)
		self.test5.entered.connect(self.fun_test5)
		self.test1.entered.connect(self.fun_test1)
		self.test6.entered.connect(self.fun_test6)
		self.test1sub1.entered.connect(self.fun_test1sub1)
		self.test1sub2.entered.connect(self.fun_test1sub2)
		self.test1sub21.entered.connect(self.fun_test1sub21)
		self.test1sub22.entered.connect(self.fun_test1sub22)
		self.test3sub1.entered.connect(self.fun_test3sub1)
		self.test3sub2.entered.connect(self.fun_test3sub2)
		self.test3sub3.entered.connect(self.fun_test3sub3)
		self.test5sub1.entered.connect(self.fun_test5sub1)
		self.test5sub2.entered.connect(self.fun_test5sub2)

		self.Machine_testcpp.setInitialState(self.test1)
		self.test1sub2.setInitialState(self.test1sub21)

#------------------

		self.mutex = QtCore.QMutex(QtCore.QMutex.Recursive)
		self.Period = 30
		self.timer = QtCore.QTimer(self)

#Slots funtion State Machine
	@QtCore.Slot()
	def fun_test2(self):
		print "Error: lack fun_test2 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test3(self):
		print "Error: lack fun_test3 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test4(self):
		print "Error: lack fun_test4 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test5(self):
		print "Error: lack fun_test5 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test1(self):
		print "Error: lack fun_test1 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test6(self):
		print "Error: lack fun_test6 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test1sub1(self):
		print "Error: lack fun_test1sub1 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test1sub2(self):
		print "Error: lack fun_test1sub2 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test1sub21(self):
		print "Error: lack fun_test1sub21 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test1sub22(self):
		print "Error: lack fun_test1sub22 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test3sub1(self):
		print "Error: lack fun_test3sub1 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test3sub2(self):
		print "Error: lack fun_test3sub2 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test3sub3(self):
		print "Error: lack fun_test3sub3 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test5sub1(self):
		print "Error: lack fun_test5sub1 in Specificworker"
		sys.exit(-1)

	@QtCore.Slot()
	def fun_test5sub2(self):
		print "Error: lack fun_test5sub2 in Specificworker"
		sys.exit(-1)

#-------------------------
	@QtCore.Slot()
	def killYourSelf(self):
		rDebug("Killing myself")
		self.kill.emit()

	# \brief Change compute period
	# @param per Period in ms
	@QtCore.Slot(int)
	def setPeriod(self, p):
		print "Period changed", p
		Period = p
		timer.start(Period)

  • specificworker.py

Example specificworker.py:

#
# Copyright (C) 2016 by YOUR NAME HERE
#
#    This file is part of RoboComp
#
#    RoboComp is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    RoboComp is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with RoboComp.  If not, see <http://www.gnu.org/licenses/>.
#

import sys, os, Ice, traceback, time

from PySide import *
from genericworker import *

ROBOCOMP = ''
try:
	ROBOCOMP = os.environ['ROBOCOMP']
except:
	print '$ROBOCOMP environment variable not set, using the default value /opt/robocomp'
	ROBOCOMP = '/opt/robocomp'
if len(ROBOCOMP)<1:
	print 'genericworker.py: ROBOCOMP environment variable not set! Exiting.'
	sys.exit()

preStr = "-I"+ROBOCOMP+"/interfaces/ --all "+ROBOCOMP+"/interfaces/"
Ice.loadSlice(preStr+"JointMotor.ice")
from RoboCompJointMotor import *

class SpecificWorker(GenericWorker):
	def __init__(self, proxy_map):
		super(SpecificWorker, self).__init__(proxy_map)
		self.Period = 2000
		self.timer.start(self.Period)
		self.Machine_testcpp.start()

	def setParams(self, params):
		#try:
		#	par = params["InnerModelPath"]
		#	innermodel_path=par.value
		#	innermodel = InnerModel(innermodel_path)
		#except:
		#	traceback.print_exc()
		#	print "Error reading config params"
		return True
#Slots funtion State Machine
	#
	# fun_test2
	#
	@QtCore.Slot()
	def fun_test2(self):
		pass

	#
	# fun_test3
	#
	@QtCore.Slot()
	def fun_test3(self):
		pass

	#
	# fun_test4
	#
	@QtCore.Slot()
	def fun_test4(self):
		pass

	#
	# fun_test5
	#
	@QtCore.Slot()
	def fun_test5(self):
		pass

	#
	# fun_test1
	#
	@QtCore.Slot()
	def fun_test1(self):
		pass

	#
	# fun_test6	
	#
	@QtCore.Slot()
	def fun_test6(self)
		pass

	#
	# fun_test1sub1
	#
	@QtCore.Slot()
	def fun_test1sub1(self):
		pass

	#
	# fun_test1sub2
	#
	@QtCore.Slot()
	def fun_test1sub2(self):
		pass

	#
	# fun_test1sub21
	#
	@QtCore.Slot()
	def fun_test1sub21(self):
		pass

	#
	# fun_test1sub22
	#
	@QtCore.Slot()
	def fun_test1sub22(self):
		pass

	#
	# fun_test3sub1
	#
	@QtCore.Slot()
	def fun_test3sub1(self):
		pass

	#
	# fun_test3sub2
	#
	@QtCore.Slot()
	def fun_test3sub2(self):
		pass

	#
	# fun_test3sub3
	#
	@QtCore.Slot()
	def fun_test3sub3(self):
		pass

	#
	# fun_test5sub1
	#
	@QtCore.Slot()
	def fun_test5sub1(self):
		pass

	#
	# fun_test5sub2
	#
	@QtCore.Slot()
	def fun_test5sub2(self):
		pass

Emit Signal

For realice a transition between states we has emit the corresponding signal. The signal is emit as follows:

self.statesrctostatedst.emit