一、通过synchronized关键字
public class test {
public static void main(String[] args) {
MyObject object=new MyObject();
ThreadA threadA=new ThreadA(object);
ThreadB threadB=new ThreadB(object);
threadA.start();
threadB.start();
}
}
class MyObject{
synchronized public void methodA(){
//do something
}
synchronized public void methodB(){
//do something
}
}
class ThreadA extends Thread{
private MyObject object;
public ThreadA(MyObject object){
this.object=object;
}
@Override
public void run(){
super.run();
object.methodA();
}
}
class ThreadB extends Thread{
private MyObject object;
public ThreadB(MyObject object){
this.object=object;
}
@Override
public void run(){
super.run();
object.methodB();
}
}
二、wait和notify
import org.omg.Messaging.SYNC_WITH_TRANSPORT;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
public class test {
public static void main(String[] args) {
try {
Object lock=new Object();
ThreadA a=new ThreadA(lock);
a.start();
Thread.sleep(1000);
ThreadB b=new ThreadB(lock);
b.start();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class MyList{
private static List<String> list=new ArrayList<>();
public static void add(){
list.add("test");
}
public static int size(){
return list.size();
}
}
class ThreadA extends Thread{
private Object lock;
public ThreadA(Object lock){
super();
this.lock=lock;
}
@Override
public void run(){
try{
synchronized (lock){
if(MyList.size()!=5){
System.out.println("wait begin:"+System.currentTimeMillis());
lock.wait();
System.out.println("wait end:"+System.currentTimeMillis());
}
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class ThreadB extends Thread{
private Object lock;
public ThreadB(Object lock){
super();
this.lock=lock;
}
@Override
public void run(){
try{
synchronized (lock){
for(int i=0;i<10;i++){
MyList.add();
if(MyList.size()==5){
lock.notifyAll();
System.out.println("已经发出了通知");
}
System.out.println("添加了"+(i+1)+"个元素");
Thread.sleep(1000);
}
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
结果:
wait begin:1602418383158
添加了1个元素
添加了2个元素
添加了3个元素
添加了4个元素
已经发出了通知
添加了5个元素
添加了6个元素
添加了7个元素
添加了8个元素
添加了9个元素
添加了10个元素
wait end:1602418394166
线程A要等待某个条件满足时(list.size()==5),才执行操作。线程B则向list中添加元素,改变list 的size。
A,B之间如何通信的呢?也就是说,线程A如何知道 list.size() 已经为5了呢?
这里用到了Object类的 wait() 和 notify() 方法。
当条件未满足时(list.size() !=5),线程A调用wait() 放弃CPU,并进入阻塞状态。
当条件满足时,线程B调用 notify()通知 线程A,所谓通知线程A,就是唤醒线程A,并让它进入可运行状态。
这种方式的一个好处就是CPU的利用率提高了。
但是也有一些缺点:比如,线程B先执行,一下子添加了5个元素并调用了notify()发送了通知,而此时线程A还执行;当线程A执行并调用wait()时,那它永远就不可能被唤醒了。因为,线程B已经发了通知了,以后不再发通知了。这说明:通知过早,会打乱程序的执行逻辑。
三、while轮询
import org.omg.Messaging.SYNC_WITH_TRANSPORT;
import java.lang.annotation.Native;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
public class test {
public static void main(String[] args) {
MyList list=new MyList();
ThreadA a=new ThreadA(list);
a.setName("A");
a.start();
ThreadB b=new ThreadB(list);
b.setName("B");
b.start();
}
}
class MyList{
private List<String> list=new ArrayList<>();
public void add(){
list.add("elements");
}
public int size(){
return list.size();
}
}
class ThreadA extends Thread{
private MyList list;
public ThreadA(MyList list){
super();
this.list=list;
}
@Override
public void run(){
try{
for(int i=0;i<10;i++){
list.add();
System.out.println("添加了:"+(i+1)+"个元素");
Thread.sleep(1000);
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
class ThreadB extends Thread{
private MyList list;
public ThreadB(MyList list){
super();
this.list=list;
}
@Override
public void run(){
try{
while(true){
if(list.size()==5){
System.out.println("线程B准备退出");
throw new InterruptedException();
}
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
在这种方式下,线程A不断改变条件,线程B不断通过while语句检测这个条件(list.size()==5)是否成立,从而实现了线程间的通信。但是这种方式会浪费CPU资源。
同时这种方式还存在另外一个问题,这就是轮询条件的可见性问题:
线程都是先把变量读取到本地线程栈空间,然后再去修改的本地变量。因此,如果线程B每次都在取本地的条件变量,那么尽管另外一个线程已经改变了轮询的条件,它也察觉不到,这样会造成死循环。
25行对list加上volatile限制后,这时程序会正常退出,不会陷入死循环。
private volatile List<String> list=new ArrayList<>();
四、管道通信
利用java.io.PipedInputStream和java.io.PipedOutputStream进行通信。
生产者每5s提供5个产品,放入管道
消费者每0.5s从管道中取一件产品,并打印剩余产品数量,打印产品信息(以数字替代)
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class test{
public static void main(String[] args) throws InterruptedException {
PipedOutputStream pos=new PipedOutputStream();
PipedInputStream pis=new PipedInputStream();
try {
pis.connect(pos);
}catch (IOException e){
e.printStackTrace();
}
new MyProducer(pos).start();
new MyConsumer(pis).start();
}
}
class MyProducer extends Thread{
private PipedOutputStream outputStream;
private int index=0;
public MyProducer(PipedOutputStream outputStream){
this.outputStream=outputStream;
}
@Override
public void run(){
while(true){
try{
for(int i=0;i<5;i++){
outputStream.write(index++);
}
}catch (IOException e){
e.printStackTrace();
}
try {
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
class MyConsumer extends Thread{
private PipedInputStream inputStream;
public MyConsumer(PipedInputStream inputStream){
this.inputStream=inputStream;
}
@Override
public void run(){
while (true){
try {
Thread.sleep(500);
}catch (InterruptedException e){
e.printStackTrace();
}
try {
int count=inputStream.available();
if(count>0){
System.out.println("rest product cout"+count);
System.out.println("get product"+inputStream.read());
}
}catch (IOException e1){
e1.printStackTrace();
}
}
}
}
https://blog.csdn.net/Hadwin1991/article/details/73527835